Skip to content

Commit

Permalink
AsyncProgressQueueWorker behaves like AsyncProgressWorkerBase except …
Browse files Browse the repository at this point in the history
…all events are delivered
  • Loading branch information
mkrufky authored and kkoopa committed Oct 6, 2017
1 parent 9445562 commit a976636
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions nan.h
Expand Up @@ -53,13 +53,16 @@
#include <cstring>
#include <climits>
#include <cstdlib>
#include <utility>
#if defined(_MSC_VER)
# pragma warning( push )
# pragma warning( disable : 4530 )
# include <queue>
# include <string>
# include <vector>
# pragma warning( pop )
#else
# include <queue>
# include <string>
# include <vector>
#endif
Expand Down Expand Up @@ -1730,6 +1733,79 @@ class AsyncProgressWorkerBase : public AsyncBareProgressWorker<T> {
// class definition.
typedef AsyncProgressWorkerBase<char> AsyncProgressWorker;

template<class T>
/* abstract */
class AsyncProgressQueueWorker : public AsyncBareProgressWorker<T> {
public:
explicit AsyncProgressQueueWorker(Callback *callback_)
: AsyncBareProgressWorker<T>(callback_) {
uv_mutex_init(&async_lock);
}

virtual ~AsyncProgressQueueWorker() {
uv_mutex_lock(&async_lock);

while (!asyncdata_.empty()) {
std::pair<T*, size_t> *datapair = asyncdata_.front();
T *data = datapair->first;

asyncdata_.pop();

delete[] data;
delete datapair;
}

uv_mutex_unlock(&async_lock);
uv_mutex_destroy(&async_lock);
}

void WorkProgress() {
uv_mutex_lock(&async_lock);

while (!asyncdata_.empty()) {
std::pair<T*, size_t> *datapair = asyncdata_.front();
asyncdata_.pop();
uv_mutex_unlock(&async_lock);

T *data = datapair->first;
size_t size = datapair->second;

// Don't send progress events after we've already completed.
if (this->callback) {
this->HandleProgressCallback(data, size);
}

delete[] data;
delete datapair;

uv_mutex_lock(&async_lock);
}

uv_mutex_unlock(&async_lock);
}

private:
void SendProgress_(const T *data, size_t count) {
T *new_data = new T[count];
{
T *it = new_data;
std::copy(data, data + count, it);
}

std::pair<T*, size_t> *datapair =
new std::pair<T*, size_t>(new_data, count);

uv_mutex_lock(&async_lock);
asyncdata_.push(datapair);
uv_mutex_unlock(&async_lock);

uv_async_send(this->async);
}

uv_mutex_t async_lock;
std::queue<std::pair<T*, size_t>*> asyncdata_;
};

inline void AsyncExecute (uv_work_t* req) {
AsyncWorker *worker = static_cast<AsyncWorker*>(req->data);
worker->Execute();
Expand Down

0 comments on commit a976636

Please sign in to comment.