From 4c7f8c94a4df0c07a4a3df232a7b98aa5498598a Mon Sep 17 00:00:00 2001 From: deepak1556 Date: Mon, 4 Mar 2019 20:22:08 +0530 Subject: [PATCH] fix: make StreamSubscriber ref counted It is owned by URLRequestStreamJob on the IO thread once request starts, but if the ownership was abondoned while transfering it to IO thread which is possible when a request is aborted, then we need to make sure its destroyed on the right thread to avoid lock in v8. --- atom/browser/api/stream_subscriber.cc | 15 ++++++++----- atom/browser/api/stream_subscriber.h | 16 ++++++++++--- atom/browser/net/url_request_stream_job.cc | 26 +++++++++++----------- atom/browser/net/url_request_stream_job.h | 4 ++-- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/atom/browser/api/stream_subscriber.cc b/atom/browser/api/stream_subscriber.cc index 7bdeb20ded354..9ac07cf056c33 100644 --- a/atom/browser/api/stream_subscriber.cc +++ b/atom/browser/api/stream_subscriber.cc @@ -19,12 +19,15 @@ namespace mate { StreamSubscriber::StreamSubscriber( v8::Isolate* isolate, v8::Local emitter, - base::WeakPtr url_job) - : isolate_(isolate), + base::WeakPtr url_job, + scoped_refptr ui_task_runner) + : base::RefCountedDeleteOnSequence(ui_task_runner), + isolate_(isolate), emitter_(isolate, emitter), url_job_(url_job), weak_factory_(this) { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(ui_task_runner->RunsTasksInCurrentSequence()); + auto weak_self = weak_factory_.GetWeakPtr(); On("data", base::Bind(&StreamSubscriber::OnData, weak_self)); On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self)); @@ -32,13 +35,12 @@ StreamSubscriber::StreamSubscriber( } StreamSubscriber::~StreamSubscriber() { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); RemoveAllListeners(); } void StreamSubscriber::On(const std::string& event, EventCallback&& callback) { // NOLINT - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); DCHECK(js_handlers_.find(event) == js_handlers_.end()); v8::Locker locker(isolate_); @@ -52,7 +54,7 @@ void StreamSubscriber::On(const std::string& event, } void StreamSubscriber::Off(const std::string& event) { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); DCHECK(js_handlers_.find(event) != js_handlers_.end()); v8::Locker locker(isolate_); @@ -96,6 +98,7 @@ void StreamSubscriber::OnError(mate::Arguments* args) { } void StreamSubscriber::RemoveAllListeners() { + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); v8::Locker locker(isolate_); v8::Isolate::Scope isolate_scope(isolate_); v8::HandleScope handle_scope(isolate_); diff --git a/atom/browser/api/stream_subscriber.h b/atom/browser/api/stream_subscriber.h index 3a766b92b52df..6f241075aef47 100644 --- a/atom/browser/api/stream_subscriber.h +++ b/atom/browser/api/stream_subscriber.h @@ -11,6 +11,8 @@ #include #include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/memory/ref_counted_delete_on_sequence.h" #include "base/memory/weak_ptr.h" #include "content/public/browser/browser_thread.h" #include "v8/include/v8.h" @@ -23,17 +25,25 @@ namespace mate { class Arguments; -class StreamSubscriber { +class StreamSubscriber + : public base::RefCountedDeleteOnSequence { public: + REQUIRE_ADOPTION_FOR_REFCOUNTED_TYPE(); + StreamSubscriber(v8::Isolate* isolate, v8::Local emitter, - base::WeakPtr url_job); - ~StreamSubscriber(); + base::WeakPtr url_job, + scoped_refptr ui_task_runner); private: + friend class base::DeleteHelper; + friend class base::RefCountedDeleteOnSequence; + using JSHandlersMap = std::map>; using EventCallback = base::Callback; + ~StreamSubscriber(); + void On(const std::string& event, EventCallback&& callback); // NOLINT void Off(const std::string& event); diff --git a/atom/browser/net/url_request_stream_job.cc b/atom/browser/net/url_request_stream_job.cc index 5fc4114d0029a..5015cc98e1fa4 100644 --- a/atom/browser/net/url_request_stream_job.cc +++ b/atom/browser/net/url_request_stream_job.cc @@ -17,6 +17,7 @@ #include "base/strings/string_number_conversions.h" #include "base/strings/string_util.h" #include "base/task/post_task.h" +#include "base/threading/thread_task_runner_handle.h" #include "base/time/time.h" #include "content/public/browser/browser_task_traits.h" #include "native_mate/dictionary.h" @@ -84,14 +85,14 @@ void BeforeStartInUI(base::WeakPtr job, return; } - auto subscriber = std::make_unique( - args->isolate(), data.GetHandle(), job); + auto subscriber = base::MakeRefCounted( + args->isolate(), data.GetHandle(), job, + base::ThreadTaskRunnerHandle::Get()); base::PostTaskWithTraits( FROM_HERE, {content::BrowserThread::IO}, - base::BindOnce(&URLRequestStreamJob::StartAsync, job, - std::move(subscriber), base::RetainedRef(response_headers), - ended, error)); + base::BindOnce(&URLRequestStreamJob::StartAsync, job, subscriber, + base::RetainedRef(response_headers), ended, error)); } } // namespace @@ -106,10 +107,7 @@ URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request, weak_factory_(this) {} URLRequestStreamJob::~URLRequestStreamJob() { - if (subscriber_) { - content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE, - std::move(subscriber_)); - } + DCHECK(!subscriber_ || subscriber_->HasOneRef()); } void URLRequestStreamJob::Start() { @@ -123,7 +121,7 @@ void URLRequestStreamJob::Start() { } void URLRequestStreamJob::StartAsync( - std::unique_ptr subscriber, + scoped_refptr subscriber, scoped_refptr response_headers, bool ended, int error) { @@ -135,7 +133,7 @@ void URLRequestStreamJob::StartAsync( ended_ = ended; response_headers_ = response_headers; - subscriber_ = std::move(subscriber); + subscriber_ = subscriber; request_start_time_ = base::TimeTicks::Now(); NotifyHeadersComplete(); } @@ -194,12 +192,14 @@ int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) { } void URLRequestStreamJob::DoneReading() { - content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE, - std::move(subscriber_)); write_buffer_.clear(); } void URLRequestStreamJob::DoneReadingRedirectResponse() { + if (subscriber_) { + DCHECK(subscriber_->HasAtLeastOneRef()); + subscriber_ = nullptr; + } DoneReading(); } diff --git a/atom/browser/net/url_request_stream_job.h b/atom/browser/net/url_request_stream_job.h index e56e9f1699299..950fd9ce224b8 100644 --- a/atom/browser/net/url_request_stream_job.h +++ b/atom/browser/net/url_request_stream_job.h @@ -24,7 +24,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob { net::NetworkDelegate* network_delegate); ~URLRequestStreamJob() override; - void StartAsync(std::unique_ptr subscriber, + void StartAsync(scoped_refptr subscriber, scoped_refptr response_headers, bool ended, int error); @@ -62,7 +62,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob { base::TimeTicks request_start_time_; base::TimeTicks response_start_time_; scoped_refptr response_headers_; - std::unique_ptr subscriber_; + scoped_refptr subscriber_; base::WeakPtrFactory weak_factory_;