Skip to content

Commit

Permalink
fix: make StreamSubscriber ref counted
Browse files Browse the repository at this point in the history
It is owned by URLRequestStreamJob on the IO thread once request starts,
but if the ownership was abondoned while transfering it to IO thread
which is possible when a request is aborted, then we need to make sure
its destroyed on the right thread to avoid lock in v8.
  • Loading branch information
deepak1556 committed Mar 17, 2019
1 parent 84ec424 commit 1e2adfc
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
15 changes: 9 additions & 6 deletions atom/browser/api/stream_subscriber.cc
Expand Up @@ -17,26 +17,28 @@ namespace mate {
StreamSubscriber::StreamSubscriber(
v8::Isolate* isolate,
v8::Local<v8::Object> emitter,
base::WeakPtr<atom::URLRequestStreamJob> url_job)
: isolate_(isolate),
base::WeakPtr<atom::URLRequestStreamJob> url_job,
scoped_refptr<base::SequencedTaskRunner> ui_task_runner)
: base::RefCountedDeleteOnSequence<StreamSubscriber>(ui_task_runner),
isolate_(isolate),
emitter_(isolate, emitter),
url_job_(url_job),
weak_factory_(this) {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(ui_task_runner->RunsTasksInCurrentSequence());

auto weak_self = weak_factory_.GetWeakPtr();
On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
On("error", base::Bind(&StreamSubscriber::OnError, weak_self));
}

StreamSubscriber::~StreamSubscriber() {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
RemoveAllListeners();
}

void StreamSubscriber::On(const std::string& event,
EventCallback&& callback) { // NOLINT
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
DCHECK(js_handlers_.find(event) == js_handlers_.end());

v8::Locker locker(isolate_);
Expand All @@ -50,7 +52,7 @@ void StreamSubscriber::On(const std::string& event,
}

void StreamSubscriber::Off(const std::string& event) {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
DCHECK(js_handlers_.find(event) != js_handlers_.end());

v8::Locker locker(isolate_);
Expand Down Expand Up @@ -96,6 +98,7 @@ void StreamSubscriber::OnError(mate::Arguments* args) {
}

void StreamSubscriber::RemoveAllListeners() {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
v8::Locker locker(isolate_);
v8::Isolate::Scope isolate_scope(isolate_);
v8::HandleScope handle_scope(isolate_);
Expand Down
16 changes: 13 additions & 3 deletions atom/browser/api/stream_subscriber.h
Expand Up @@ -11,6 +11,8 @@
#include <vector>

#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/ref_counted_delete_on_sequence.h"
#include "base/memory/weak_ptr.h"
#include "content/public/browser/browser_thread.h"
#include "v8/include/v8.h"
Expand All @@ -23,17 +25,25 @@ namespace mate {

class Arguments;

class StreamSubscriber {
class StreamSubscriber
: public base::RefCountedDeleteOnSequence<StreamSubscriber> {
public:
REQUIRE_ADOPTION_FOR_REFCOUNTED_TYPE();

StreamSubscriber(v8::Isolate* isolate,
v8::Local<v8::Object> emitter,
base::WeakPtr<atom::URLRequestStreamJob> url_job);
~StreamSubscriber();
base::WeakPtr<atom::URLRequestStreamJob> url_job,
scoped_refptr<base::SequencedTaskRunner> ui_task_runner);

private:
friend class base::DeleteHelper<StreamSubscriber>;
friend class base::RefCountedDeleteOnSequence<StreamSubscriber>;

using JSHandlersMap = std::map<std::string, v8::Global<v8::Value>>;
using EventCallback = base::Callback<void(mate::Arguments* args)>;

~StreamSubscriber();

void On(const std::string& event, EventCallback&& callback); // NOLINT
void Off(const std::string& event);

Expand Down
28 changes: 14 additions & 14 deletions atom/browser/net/url_request_stream_job.cc
Expand Up @@ -13,14 +13,16 @@
#include "atom/common/api/event_emitter_caller.h"
#include "atom/common/atom_constants.h"
#include "atom/common/native_mate_converters/net_converter.h"
#include "atom/common/node_includes.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "native_mate/dictionary.h"
#include "net/base/net_errors.h"
#include "net/filter/gzip_source_stream.h"

#include "atom/common/node_includes.h"

namespace atom {

namespace {
Expand Down Expand Up @@ -82,14 +84,14 @@ void BeforeStartInUI(base::WeakPtr<URLRequestStreamJob> job,
return;
}

auto subscriber = std::make_unique<mate::StreamSubscriber>(
args->isolate(), data.GetHandle(), job);
auto subscriber = base::MakeRefCounted<mate::StreamSubscriber>(
args->isolate(), data.GetHandle(), job,
base::ThreadTaskRunnerHandle::Get());

content::BrowserThread::PostTask(
content::BrowserThread::IO, FROM_HERE,
base::BindOnce(&URLRequestStreamJob::StartAsync, job,
std::move(subscriber), base::RetainedRef(response_headers),
ended, error));
base::BindOnce(&URLRequestStreamJob::StartAsync, job, subscriber,
base::RetainedRef(response_headers), ended, error));
}

} // namespace
Expand All @@ -104,10 +106,7 @@ URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request,
weak_factory_(this) {}

URLRequestStreamJob::~URLRequestStreamJob() {
if (subscriber_) {
content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
std::move(subscriber_));
}
DCHECK(!subscriber_ || subscriber_->HasOneRef());
}

void URLRequestStreamJob::Start() {
Expand All @@ -121,7 +120,7 @@ void URLRequestStreamJob::Start() {
}

void URLRequestStreamJob::StartAsync(
std::unique_ptr<mate::StreamSubscriber> subscriber,
scoped_refptr<mate::StreamSubscriber> subscriber,
scoped_refptr<net::HttpResponseHeaders> response_headers,
bool ended,
int error) {
Expand All @@ -133,7 +132,7 @@ void URLRequestStreamJob::StartAsync(

ended_ = ended;
response_headers_ = response_headers;
subscriber_ = std::move(subscriber);
subscriber_ = subscriber;
request_start_time_ = base::TimeTicks::Now();
NotifyHeadersComplete();
}
Expand Down Expand Up @@ -192,12 +191,13 @@ int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) {
}

void URLRequestStreamJob::DoneReading() {
content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
std::move(subscriber_));
write_buffer_.clear();
}

void URLRequestStreamJob::DoneReadingRedirectResponse() {
if (subscriber_) {
subscriber_ = nullptr;
}
DoneReading();
}

Expand Down
4 changes: 2 additions & 2 deletions atom/browser/net/url_request_stream_job.h
Expand Up @@ -24,7 +24,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
net::NetworkDelegate* network_delegate);
~URLRequestStreamJob() override;

void StartAsync(std::unique_ptr<mate::StreamSubscriber> subscriber,
void StartAsync(scoped_refptr<mate::StreamSubscriber> subscriber,
scoped_refptr<net::HttpResponseHeaders> response_headers,
bool ended,
int error);
Expand Down Expand Up @@ -62,7 +62,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
base::TimeTicks request_start_time_;
base::TimeTicks response_start_time_;
scoped_refptr<net::HttpResponseHeaders> response_headers_;
std::unique_ptr<mate::StreamSubscriber> subscriber_;
scoped_refptr<mate::StreamSubscriber> subscriber_;

base::WeakPtrFactory<URLRequestStreamJob> weak_factory_;

Expand Down

0 comments on commit 1e2adfc

Please sign in to comment.