Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate protocol module to NetworkService (Part 6) #18223

Merged
merged 8 commits into from
May 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 25 additions & 2 deletions atom/browser/net/atom_url_loader_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,21 @@ network::ResourceResponseHead ToResponseHead(const mate::Dictionary& dict) {
base::DictionaryValue headers;
if (dict.Get("headers", &headers)) {
for (const auto& iter : headers.DictItems()) {
head.headers->AddHeader(iter.first + ": " + iter.second.GetString());
if (iter.second.is_string()) {
// key: value
head.headers->AddHeader(iter.first + ": " + iter.second.GetString());
} else if (iter.second.is_list()) {
// key: [values...]
for (const auto& item : iter.second.GetList()) {
if (item.is_string())
head.headers->AddHeader(iter.first + ": " + item.GetString());
}
} else {
continue;
}
// Some apps are passing content-type via headers, which is not accepted
// in NetworkService.
if (iter.first == "content-type")
if (iter.first == "content-type" && iter.second.is_string())
head.mime_type = iter.second.GetString();
}
}
Expand Down Expand Up @@ -339,7 +350,19 @@ void AtomURLLoaderFactory::StartLoadingStream(
} else if (stream->IsNullOrUndefined()) {
// "data" was explicitly passed as null or undefined, assume the user wants
// to send an empty body.
//
// Note that We must submit a empty body otherwise NetworkService would
// crash.
client->OnReceiveResponse(head);
mojo::ScopedDataPipeProducerHandle producer;
mojo::ScopedDataPipeConsumerHandle consumer;
if (mojo::CreateDataPipe(nullptr, &producer, &consumer) != MOJO_RESULT_OK) {
client->OnComplete(
network::URLLoaderCompletionStatus(net::ERR_INSUFFICIENT_RESOURCES));
return;
}
producer.reset(); // The data pipe is empty.
client->OnStartLoadingResponseBody(std::move(consumer));
client->OnComplete(network::URLLoaderCompletionStatus(net::OK));
return;
} else if (!stream->IsObject()) {
Expand Down
143 changes: 83 additions & 60 deletions atom/browser/net/node_stream_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,19 @@ NodeStreamLoader::NodeStreamLoader(network::ResourceResponseHead head,
network::mojom::URLLoaderClientPtr client,
v8::Isolate* isolate,
v8::Local<v8::Object> emitter)
: binding_(this),
: binding_(this, std::move(loader)),
client_(std::move(client)),
isolate_(isolate),
emitter_(isolate, emitter),
weak_factory_(this) {
auto weak = weak_factory_.GetWeakPtr();
binding_.Bind(std::move(loader));
binding_.set_connection_error_handler(
base::BindOnce(&NodeStreamLoader::OnConnectionError, weak));

mojo::ScopedDataPipeConsumerHandle consumer;
MojoResult rv = mojo::CreateDataPipe(nullptr, &producer_, &consumer);
if (rv != MOJO_RESULT_OK) {
OnError(nullptr);
return;
}

client_->OnReceiveResponse(head);
client_->OnStartLoadingResponseBody(std::move(consumer));
base::BindOnce(&NodeStreamLoader::NotifyComplete,
weak_factory_.GetWeakPtr(), net::ERR_FAILED));

On("end", base::BindRepeating(&NodeStreamLoader::OnEnd, weak));
On("error", base::BindRepeating(&NodeStreamLoader::OnError, weak));
// Since every node::MakeCallback call has a micro scope itself, we have to
// subscribe |data| at last otherwise |end|'s listener won't be called when
// it is emitted in the same tick.
On("data", base::BindRepeating(&NodeStreamLoader::OnData, weak));
// PostTask since it might destruct.
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&NodeStreamLoader::Start,
weak_factory_.GetWeakPtr(), std::move(head)));
}

NodeStreamLoader::~NodeStreamLoader() {
Expand All @@ -58,62 +45,98 @@ NodeStreamLoader::~NodeStreamLoader() {
node::MakeCallback(isolate_, emitter_.Get(isolate_), "removeListener",
node::arraysize(args), args, {0, 0});
}

// Release references.
emitter_.Reset();
buffer_.Reset();
}

void NodeStreamLoader::On(const char* event, EventCallback callback) {
v8::Locker locker(isolate_);
v8::Isolate::Scope isolate_scope(isolate_);
v8::HandleScope handle_scope(isolate_);
void NodeStreamLoader::Start(network::ResourceResponseHead head) {
mojo::ScopedDataPipeProducerHandle producer;
mojo::ScopedDataPipeConsumerHandle consumer;
MojoResult rv = mojo::CreateDataPipe(nullptr, &producer, &consumer);
if (rv != MOJO_RESULT_OK) {
NotifyComplete(net::ERR_INSUFFICIENT_RESOURCES);
return;
}

// emitter.on(event, callback)
v8::Local<v8::Value> args[] = {
mate::StringToV8(isolate_, event),
mate::CallbackToV8(isolate_, std::move(callback)),
};
node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
node::arraysize(args), args, {0, 0});
producer_ =
std::make_unique<mojo::StringDataPipeProducer>(std::move(producer));

handlers_[event].Reset(isolate_, args[1]);
client_->OnReceiveResponse(head);
client_->OnStartLoadingResponseBody(std::move(consumer));

auto weak = weak_factory_.GetWeakPtr();
On("end",
base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
net::ERR_FAILED));
On("readable", base::BindRepeating(&NodeStreamLoader::ReadMore, weak));
}

void NodeStreamLoader::OnData(mate::Arguments* args) {
v8::Local<v8::Value> buffer;
args->GetNext(&buffer);
if (!node::Buffer::HasInstance(buffer)) {
args->ThrowError("data must be Buffer");
void NodeStreamLoader::NotifyComplete(int result) {
// Wait until write finishes or fails.
if (is_writing_) {
ended_ = true;
result_ = result;
return;
}

size_t ssize = node::Buffer::Length(buffer);
uint32_t size = base::saturated_cast<uint32_t>(ssize);
MojoResult result = producer_->WriteData(node::Buffer::Data(buffer), &size,
MOJO_WRITE_DATA_FLAG_NONE);
if (result != MOJO_RESULT_OK || size < ssize) {
OnError(nullptr);
return;
}
client_->OnComplete(network::URLLoaderCompletionStatus(result));
delete this;
}

void NodeStreamLoader::OnEnd(mate::Arguments* args) {
client_->OnComplete(network::URLLoaderCompletionStatus(net::OK));
client_.reset();
MaybeDeleteSelf();
}
void NodeStreamLoader::ReadMore() {
// buffer = emitter.read()
v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});

void NodeStreamLoader::OnError(mate::Arguments* args) {
client_->OnComplete(network::URLLoaderCompletionStatus(net::ERR_FAILED));
client_.reset();
MaybeDeleteSelf();
// If there is no buffer read, wait until |readable| is emitted again.
v8::Local<v8::Value> buffer;
if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer))
return;

// Hold the buffer until the write is done.
buffer_.Reset(isolate_, buffer);

// Write buffer to mojo pipe asyncronously.
is_writing_ = true;
producer_->Write(
base::StringPiece(node::Buffer::Data(buffer),
node::Buffer::Length(buffer)),
mojo::StringDataPipeProducer::AsyncWritingMode::
STRING_STAYS_VALID_UNTIL_COMPLETION,
base::BindOnce(&NodeStreamLoader::DidWrite, weak_factory_.GetWeakPtr()));
}

void NodeStreamLoader::OnConnectionError() {
binding_.Close();
MaybeDeleteSelf();
void NodeStreamLoader::DidWrite(MojoResult result) {
is_writing_ = false;
// We were told to end streaming.
if (ended_) {
NotifyComplete(result_);
return;
}

if (result == MOJO_RESULT_OK)
ReadMore();
else
NotifyComplete(net::ERR_FAILED);
}

void NodeStreamLoader::MaybeDeleteSelf() {
if (!binding_.is_bound() && !client_.is_bound())
delete this;
void NodeStreamLoader::On(const char* event, EventCallback callback) {
v8::Locker locker(isolate_);
v8::Isolate::Scope isolate_scope(isolate_);
v8::HandleScope handle_scope(isolate_);

// emitter.on(event, callback)
v8::Local<v8::Value> args[] = {
mate::StringToV8(isolate_, event),
mate::CallbackToV8(isolate_, std::move(callback)),
};
handlers_[event].Reset(isolate_, args[1]);
node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
node::arraysize(args), args, {0, 0});
// No more code bellow, as this class may destruct when subscribing.
}

} // namespace atom
50 changes: 30 additions & 20 deletions atom/browser/net/node_stream_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@
#define ATOM_BROWSER_NET_NODE_STREAM_LOADER_H_

#include <map>
#include <memory>
#include <string>
#include <vector>

#include "mojo/public/cpp/bindings/strong_binding.h"
#include "mojo/public/cpp/system/string_data_pipe_producer.h"
#include "services/network/public/mojom/url_loader.mojom.h"
#include "v8/include/v8.h"

namespace mate {
class Arguments;
}

namespace atom {

// Read data from node Stream and feed it to NetworkService.
//
// This class manages its own lifetime and should delete itself when the
// connection is lost or finished.
//
// We use |paused mode| to read data from |Readable| stream, so we don't need to
// copy data from buffer and hold it in memory, and we only need to make sure
// the passed |Buffer| is alive while writing data to pipe.
class NodeStreamLoader : public network::mojom::URLLoader {
public:
NodeStreamLoader(network::ResourceResponseHead head,
Expand All @@ -30,7 +36,15 @@ class NodeStreamLoader : public network::mojom::URLLoader {
private:
~NodeStreamLoader() override;

using EventCallback = base::RepeatingCallback<void(mate::Arguments* args)>;
using EventCallback = base::RepeatingCallback<void()>;

void Start(network::ResourceResponseHead head);
void NotifyComplete(int result);
void ReadMore();
void DidWrite(MojoResult result);

// Subscribe to events of |emitter|.
void On(const char* event, EventCallback callback);

// URLLoader:
void FollowRedirect(const std::vector<std::string>& removed_headers,
Expand All @@ -42,27 +56,23 @@ class NodeStreamLoader : public network::mojom::URLLoader {
void PauseReadingBodyFromNet() override {}
void ResumeReadingBodyFromNet() override {}

// JS bindings.
void On(const char* event, EventCallback callback);
void OnData(mate::Arguments* args);
void OnEnd(mate::Arguments* args);
void OnError(mate::Arguments* args);

// This class manages its own lifetime and should delete itself when the
// connection is lost or finished.
//
// The code is updated with `content::FileURLLoader`.
void OnConnectionError();
void MaybeDeleteSelf();

mojo::Binding<network::mojom::URLLoader> binding_;
network::mojom::URLLoaderClientPtr client_;

v8::Isolate* isolate_;
v8::Global<v8::Object> emitter_;
v8::Global<v8::Value> buffer_;

// Mojo data pipe where the data that is being read is written to.
std::unique_ptr<mojo::StringDataPipeProducer> producer_;

// Whether we are in the middle of write.
bool is_writing_ = false;

// Pipes for communicating between Node and NetworkService.
mojo::ScopedDataPipeProducerHandle producer_;
// When NotifyComplete is called while writing, we will save the result and
// quit with it after the write is done.
bool ended_ = false;
int result_ = net::OK;

// Store the V8 callbacks to unsubscribe them later.
std::map<std::string, v8::Global<v8::Value>> handlers_;
Expand Down