Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(@opentelemetry/exporter-collector): remove fulfilled promises cor… #1775

Merged
merged 19 commits into from Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -61,25 +61,17 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

onInit(config: CollectorExporterConfigNode): void {
Expand Down
@@ -0,0 +1,156 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { collectorTypes } from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';

import * as assert from 'assert';
import { CollectorExporterNodeBase } from '../src/CollectorExporterNodeBase';
import { CollectorExporterConfigNode, ServiceClientType } from '../src/types';
import { mockedReadableSpan } from './helper';

class MockCollectorExporter extends CollectorExporterNodeBase<
ReadableSpan,
ReadableSpan[]
> {
/**
* Callbacks passed to _send()
*/
sendCallbacks: {
onSuccess: () => void;
onError: (error: collectorTypes.CollectorExporterError) => void;
}[] = [];

getDefaultUrl(config: CollectorExporterConfigNode): string {
return '';
}

getDefaultServiceName(config: CollectorExporterConfigNode): string {
return '';
}

convert(spans: ReadableSpan[]): ReadableSpan[] {
return spans;
}

getServiceClientType() {
return ServiceClientType.SPANS;
}

getServiceProtoPath(): string {
return 'opentelemetry/proto/collector/trace/v1/trace_service.proto';
}
}

// Mocked _send which just saves the callbacks for later
MockCollectorExporter.prototype['_send'] = function _sendMock(
dyladan marked this conversation as resolved.
Show resolved Hide resolved
self: MockCollectorExporter,
objects: ReadableSpan[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
self.sendCallbacks.push({ onSuccess, onError });
};

describe('CollectorExporterNodeBase', () => {
let exporter: MockCollectorExporter;
const concurrencyLimit = 5;

beforeEach(done => {
exporter = new MockCollectorExporter({ concurrencyLimit });
done();
});

describe('export', () => {
it('should export requests concurrently', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, numToExport);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should drop new export requests when already sending at concurrencyLimit', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit + 5;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, concurrencyLimit);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if they failed', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter.export(spans, () => {});
assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests fail sending
exporter.sendCallbacks.forEach(({ onError }) =>
onError(new Error('Failed to send!!'))
);

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if success callback throws error', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter['_sendPromise'](
spans,
() => {
throw new Error('Oops');
},
() => {}
);

assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises'])
// catch expected unhandled exception
.catch(() => {});

// Mock that the request finishes sending
exporter.sendCallbacks.forEach(({ onSuccess }) => {
onSuccess();
});

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});
});
});
Expand Up @@ -47,25 +47,17 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, this.compression, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, this.compression, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

override onInit(config: CollectorExporterNodeConfigBase): void {
Expand Down
Expand Up @@ -76,27 +76,20 @@ export abstract class CollectorExporterBrowserBase<
const serviceRequest = this.convert(items);
const body = JSON.stringify(serviceRequest);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

const promise = new Promise<void>((resolve, reject) => {
if (this._useXHR) {
sendWithXhr(body, this.url, this._headers, _onSuccess, _onError);
sendWithXhr(body, this.url, this._headers, resolve, reject);
} else {
sendWithBeacon(body, this.url, { type: 'application/json' }, _onSuccess, _onError);
sendWithBeacon(body, this.url, { type: 'application/json' }, resolve, reject);
}
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}
}
Expand Up @@ -70,30 +70,23 @@ export abstract class CollectorExporterNodeBase<
}
const serviceRequest = this.convert(objects);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};
const promise = new Promise<void>((resolve, reject) => {
sendWithHttp(
this,
JSON.stringify(serviceRequest),
'application/json',
_onSuccess,
_onError
resolve,
reject
);
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

onShutdown(): void {}
Expand Down
9 changes: 7 additions & 2 deletions packages/opentelemetry-exporter-zipkin/src/zipkin.ts
Expand Up @@ -84,11 +84,16 @@ export class ZipkinExporter implements SpanExporter {
this._sendSpans(spans, serviceName, result => {
resolve();
resultCallback(result);
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
});


this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

/**
Expand Down