Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make StreamSubscriber ref counted (backport: 4-1-x) #17267

Merged
merged 1 commit into from Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions atom/browser/api/stream_subscriber.cc
Expand Up @@ -17,26 +17,28 @@ namespace mate {
StreamSubscriber::StreamSubscriber(
v8::Isolate* isolate,
v8::Local<v8::Object> emitter,
base::WeakPtr<atom::URLRequestStreamJob> url_job)
: isolate_(isolate),
base::WeakPtr<atom::URLRequestStreamJob> url_job,
scoped_refptr<base::SequencedTaskRunner> ui_task_runner)
: base::RefCountedDeleteOnSequence<StreamSubscriber>(ui_task_runner),
isolate_(isolate),
emitter_(isolate, emitter),
url_job_(url_job),
weak_factory_(this) {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(ui_task_runner->RunsTasksInCurrentSequence());

auto weak_self = weak_factory_.GetWeakPtr();
On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
On("error", base::Bind(&StreamSubscriber::OnError, weak_self));
}

StreamSubscriber::~StreamSubscriber() {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
RemoveAllListeners();
}

void StreamSubscriber::On(const std::string& event,
EventCallback&& callback) { // NOLINT
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
DCHECK(js_handlers_.find(event) == js_handlers_.end());

v8::Locker locker(isolate_);
Expand All @@ -50,7 +52,7 @@ void StreamSubscriber::On(const std::string& event,
}

void StreamSubscriber::Off(const std::string& event) {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
DCHECK(js_handlers_.find(event) != js_handlers_.end());

v8::Locker locker(isolate_);
Expand Down Expand Up @@ -96,6 +98,7 @@ void StreamSubscriber::OnError(mate::Arguments* args) {
}

void StreamSubscriber::RemoveAllListeners() {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
v8::Locker locker(isolate_);
v8::Isolate::Scope isolate_scope(isolate_);
v8::HandleScope handle_scope(isolate_);
Expand Down
16 changes: 13 additions & 3 deletions atom/browser/api/stream_subscriber.h
Expand Up @@ -11,6 +11,8 @@
#include <vector>

#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/ref_counted_delete_on_sequence.h"
#include "base/memory/weak_ptr.h"
#include "content/public/browser/browser_thread.h"
#include "v8/include/v8.h"
Expand All @@ -23,17 +25,25 @@ namespace mate {

class Arguments;

class StreamSubscriber {
class StreamSubscriber
: public base::RefCountedDeleteOnSequence<StreamSubscriber> {
public:
REQUIRE_ADOPTION_FOR_REFCOUNTED_TYPE();

StreamSubscriber(v8::Isolate* isolate,
v8::Local<v8::Object> emitter,
base::WeakPtr<atom::URLRequestStreamJob> url_job);
~StreamSubscriber();
base::WeakPtr<atom::URLRequestStreamJob> url_job,
scoped_refptr<base::SequencedTaskRunner> ui_task_runner);

private:
friend class base::DeleteHelper<StreamSubscriber>;
friend class base::RefCountedDeleteOnSequence<StreamSubscriber>;

using JSHandlersMap = std::map<std::string, v8::Global<v8::Value>>;
using EventCallback = base::Callback<void(mate::Arguments* args)>;

~StreamSubscriber();

void On(const std::string& event, EventCallback&& callback); // NOLINT
void Off(const std::string& event);

Expand Down
28 changes: 14 additions & 14 deletions atom/browser/net/url_request_stream_job.cc
Expand Up @@ -13,14 +13,16 @@
#include "atom/common/api/event_emitter_caller.h"
#include "atom/common/atom_constants.h"
#include "atom/common/native_mate_converters/net_converter.h"
#include "atom/common/node_includes.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "native_mate/dictionary.h"
#include "net/base/net_errors.h"
#include "net/filter/gzip_source_stream.h"

#include "atom/common/node_includes.h"

namespace atom {

namespace {
Expand Down Expand Up @@ -82,14 +84,14 @@ void BeforeStartInUI(base::WeakPtr<URLRequestStreamJob> job,
return;
}

auto subscriber = std::make_unique<mate::StreamSubscriber>(
args->isolate(), data.GetHandle(), job);
auto subscriber = base::MakeRefCounted<mate::StreamSubscriber>(
args->isolate(), data.GetHandle(), job,
base::ThreadTaskRunnerHandle::Get());

content::BrowserThread::PostTask(
content::BrowserThread::IO, FROM_HERE,
base::BindOnce(&URLRequestStreamJob::StartAsync, job,
std::move(subscriber), base::RetainedRef(response_headers),
ended, error));
base::BindOnce(&URLRequestStreamJob::StartAsync, job, subscriber,
base::RetainedRef(response_headers), ended, error));
}

} // namespace
Expand All @@ -104,10 +106,7 @@ URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request,
weak_factory_(this) {}

URLRequestStreamJob::~URLRequestStreamJob() {
if (subscriber_) {
content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
std::move(subscriber_));
}
DCHECK(!subscriber_ || subscriber_->HasOneRef());
}

void URLRequestStreamJob::Start() {
Expand All @@ -121,7 +120,7 @@ void URLRequestStreamJob::Start() {
}

void URLRequestStreamJob::StartAsync(
std::unique_ptr<mate::StreamSubscriber> subscriber,
scoped_refptr<mate::StreamSubscriber> subscriber,
scoped_refptr<net::HttpResponseHeaders> response_headers,
bool ended,
int error) {
Expand All @@ -133,7 +132,7 @@ void URLRequestStreamJob::StartAsync(

ended_ = ended;
response_headers_ = response_headers;
subscriber_ = std::move(subscriber);
subscriber_ = subscriber;
request_start_time_ = base::TimeTicks::Now();
NotifyHeadersComplete();
}
Expand Down Expand Up @@ -192,12 +191,13 @@ int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) {
}

void URLRequestStreamJob::DoneReading() {
content::BrowserThread::DeleteSoon(content::BrowserThread::UI, FROM_HERE,
std::move(subscriber_));
write_buffer_.clear();
}

void URLRequestStreamJob::DoneReadingRedirectResponse() {
if (subscriber_) {
subscriber_ = nullptr;
}
DoneReading();
}

Expand Down
4 changes: 2 additions & 2 deletions atom/browser/net/url_request_stream_job.h
Expand Up @@ -24,7 +24,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
net::NetworkDelegate* network_delegate);
~URLRequestStreamJob() override;

void StartAsync(std::unique_ptr<mate::StreamSubscriber> subscriber,
void StartAsync(scoped_refptr<mate::StreamSubscriber> subscriber,
scoped_refptr<net::HttpResponseHeaders> response_headers,
bool ended,
int error);
Expand Down Expand Up @@ -62,7 +62,7 @@ class URLRequestStreamJob : public JsAsker, public net::URLRequestJob {
base::TimeTicks request_start_time_;
base::TimeTicks response_start_time_;
scoped_refptr<net::HttpResponseHeaders> response_headers_;
std::unique_ptr<mate::StreamSubscriber> subscriber_;
scoped_refptr<mate::StreamSubscriber> subscriber_;

base::WeakPtrFactory<URLRequestStreamJob> weak_factory_;

Expand Down