From bff7268cdf2a3ee6fb5a088439d4e8d5f1b62ed1 Mon Sep 17 00:00:00 2001 From: Robo Date: Thu, 7 Mar 2019 20:50:03 +0530 Subject: [PATCH] fix: make StreamSubscriber ref counted It is owned by URLRequestStreamJob on the IO thread once request starts, but if the ownership was abondoned while transfering it to IO thread which is possible when a request is aborted, then we need to make sure its destroyed on the right thread to avoid lock in v8. --- atom/browser/api/stream_subscriber.cc | 15 ++++++++----- atom/browser/api/stream_subscriber.h | 16 ++++++++++--- atom/browser/net/url_request_stream_job.cc | 26 +++++++++++----------- atom/browser/net/url_request_stream_job.h | 4 ++-- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/atom/browser/api/stream_subscriber.cc b/atom/browser/api/stream_subscriber.cc index 431dfcbe776a5..7cfd34e99956f 100644 --- a/atom/browser/api/stream_subscriber.cc +++ b/atom/browser/api/stream_subscriber.cc @@ -17,12 +17,15 @@ namespace mate { StreamSubscriber::StreamSubscriber( v8::Isolate* isolate, v8::Local emitter, - base::WeakPtr url_job) - : isolate_(isolate), + base::WeakPtr url_job, + scoped_refptr ui_task_runner) + : base::RefCountedDeleteOnSequence(ui_task_runner), + isolate_(isolate), emitter_(isolate, emitter), url_job_(url_job), weak_factory_(this) { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(ui_task_runner->RunsTasksInCurrentSequence()); + auto weak_self = weak_factory_.GetWeakPtr(); On("data", base::Bind(&StreamSubscriber::OnData, weak_self)); On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self)); @@ -30,13 +33,12 @@ StreamSubscriber::StreamSubscriber( } StreamSubscriber::~StreamSubscriber() { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); RemoveAllListeners(); } void StreamSubscriber::On(const std::string& event, EventCallback&& callback) { // NOLINT - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); DCHECK(js_handlers_.find(event) == js_handlers_.end()); v8::Locker locker(isolate_); @@ -50,7 +52,7 @@ void StreamSubscriber::On(const std::string& event, } void StreamSubscriber::Off(const std::string& event) { - DCHECK_CURRENTLY_ON(content::BrowserThread::UI); + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); DCHECK(js_handlers_.find(event) != js_handlers_.end()); v8::Locker locker(isolate_); @@ -96,6 +98,7 @@ void StreamSubscriber::OnError(mate::Arguments* args) { } void StreamSubscriber::RemoveAllListeners() { + DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); v8::Locker locker(isolate_); v8::Isolate::Scope isolate_scope(isolate_); v8::HandleScope handle_scope(isolate_); diff --git a/atom/browser/api/stream_subscriber.h b/atom/browser/api/stream_subscriber.h index 3a766b92b52df..6f241075aef47 100644 --- a/atom/browser/api/stream_subscriber.h +++ b/atom/browser/api/stream_subscriber.h @@ -11,6 +11,8 @@ #include #include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/memory/ref_counted_delete_on_sequence.h" #include "base/memory/weak_ptr.h" #include "content/public/browser/browser_thread.h" #include "v8/include/v8.h" @@ -23,17 +25,25 @@ namespace mate { class Arguments; -class StreamSubscriber { +class StreamSubscriber + : public base::RefCountedDeleteOnSequence { public: + REQUIRE_ADOPTION_FOR_REFCOUNTED_TYPE(); + StreamSubscriber(v8::Isolate* isolate, v8::Local emitter, - base::WeakPtr url_job); - ~StreamSubscriber(); + base::WeakPtr url_job, + scoped_refptr ui_task_runner); private: + friend class base::DeleteHelper; + friend class base::RefCountedDeleteOnSequence; + using JSHandlersMap = std::map>; using EventCallback = base::Callback; + ~StreamSubscriber(); + void On(const std::string& event, EventCallback&& callback); // NOLINT void Off(const std::string& event); diff --git a/atom/browser/net/url_request_stream_job.cc b/atom/browser/net/url_request_stream_job.cc index 8be31c146c435..934006ca597a2 100644 --- a/atom/browser/net/url_request_stream_job.cc +++ b/atom/browser/net/url_request_stream_job.cc @@ -16,6 +16,7 @@ #include "atom/common/node_includes.h" #include "base/strings/string_number_conversions.h" #include "base/strings/string_util.h" +#include "base/threading/thread_task_runner_handle.h" #include "base/time/time.h" #include "native_mate/dictionary.h" #include "net/base/net_errors.h" @@ -82,14 +83,14 @@ void BeforeStartInUI(base::WeakPtr job, return; } - auto subscriber = std::make_unique( - args->isolate(), data.GetHandle(), job); + auto subscriber = base::MakeRefCounted( + args->isolate(), data.GetHandle(), job, + base::ThreadTaskRunnerHandle::Get()); content::BrowserThread::PostTask( content::BrowserThread::IO, FROM_HERE, - base::BindOnce(&URLRequestStreamJob::StartAsync, job, - std::move(subscriber), base::RetainedRef(response_headers), - ended, error)); + base::BindOnce(&URLRequestStreamJob::StartAsync, job, subscriber, + base::RetainedRef(response_headers), ended, error)); } } // namespace @@ -104,10 +105,7 @@ URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request, weak_factory_(this) {} URLRequestStreamJob::~URLRequestStreamJob() { - if (subscriber_) { - content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE, - std::move(subscriber_)); - } + DCHECK(!subscriber_ || subscriber_->HasOneRef()); } void URLRequestStreamJob::Start() { @@ -121,7 +119,7 @@ void URLRequestStreamJob::Start() { } void URLRequestStreamJob::StartAsync( - std::unique_ptr subscriber, + scoped_refptr subscriber, scoped_refptr response_headers, bool ended, int error) { @@ -133,7 +131,7 @@ void URLRequestStreamJob::StartAsync( ended_ = ended; response_headers_ = response_headers; - subscriber_ = std::move(subscriber); + subscriber_ = subscriber; request_start_time_ = base::TimeTicks::Now(); NotifyHeadersComplete(); } @@ -192,12 +190,14 @@ int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) { } void URLRequestStreamJob::DoneReading() { - content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE, - std::move(subscriber_)); write_buffer_.clear(); } void URLRequestStreamJob::DoneReadingRedirectResponse() { + if (subscriber_) { + DCHECK(subscriber_->HasAtLeastOneRef()); + subscriber_ = nullptr; + } DoneReading(); } diff --git a/atom/browser/net/url_request_stream_job.h b/atom/browser/net/url_request_stream_job.h index e56e9f1699299..950fd9ce224b8 100644 --- a/atom/browser/net/url_request_stream_job.h +++ b/atom/browser/net/url_request_stream_job.h @@ -24,7 +24,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob { net::NetworkDelegate* network_delegate); ~URLRequestStreamJob() override; - void StartAsync(std::unique_ptr subscriber, + void StartAsync(scoped_refptr subscriber, scoped_refptr response_headers, bool ended, int error); @@ -62,7 +62,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob { base::TimeTicks request_start_time_; base::TimeTicks response_start_time_; scoped_refptr response_headers_; - std::unique_ptr subscriber_; + scoped_refptr subscriber_; base::WeakPtrFactory weak_factory_;