Skip to content

Commit

Permalink
[Event Hub] Support for creating a batch when sending events (#4268)
Browse files Browse the repository at this point in the history
* [Event Hubs] Support creating of a batch when sending events
  • Loading branch information
ShivangiReja committed Jul 15, 2019
1 parent 4032855 commit e1e7386
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 51 deletions.
23 changes: 20 additions & 3 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
import { WebSocketImpl } from 'rhea-promise';

// @public
export interface BatchOptions {
maxMessageSizeInBytes?: number;
partitionKey?: string;
}

export { DataTransformer }

export { DefaultDataTransformer }
Expand All @@ -36,6 +42,18 @@ export interface EventData {
};
}

// @public
export class EventDataBatch {
// Warning: (ae-forgotten-export) The symbol "ConnectionContext" needs to be exported by the entry point index.d.ts
//
// @internal
constructor(context: ConnectionContext, maxSizeInBytes: number, partitionKey?: string);
readonly batchMessage: Buffer | undefined;
readonly partitionKey: string | undefined;
readonly size: number | undefined;
tryAdd(eventData: EventData): boolean;
}

// @public
export class EventHubClient {
constructor(connectionString: string, options?: EventHubClientOptions);
Expand Down Expand Up @@ -63,8 +81,6 @@ export interface EventHubClientOptions {

// @public
export class EventHubConsumer {
// Warning: (ae-forgotten-export) The symbol "ConnectionContext" needs to be exported by the entry point index.d.ts
//
// @internal
constructor(context: ConnectionContext, consumerGroup: string, partitionId: string, eventPosition: EventPosition, options?: EventHubConsumerOptions);
close(): Promise<void>;
Expand All @@ -89,8 +105,9 @@ export class EventHubProducer {
// @internal
constructor(context: ConnectionContext, options?: EventHubProducerOptions);
close(): Promise<void>;
createBatch(options?: BatchOptions): Promise<EventDataBatch>;
readonly isClosed: boolean;
send(eventData: EventData | EventData[], options?: SendOptions): Promise<void>;
send(eventData: EventData | EventData[] | EventDataBatch, options?: SendOptions): Promise<void>;
}

// @public
Expand Down
27 changes: 22 additions & 5 deletions sdk/eventhub/event-hubs/samples/sendEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Licensed under the MIT Licence.
This sample demonstrates how the send() function can be used to send events to Event Hubs.
See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-about to learn about Event Hubs.
Note: If you are using version 2.1.0 or lower of @azure/event-hubs library, then please use the samples at
Expand Down Expand Up @@ -33,7 +32,6 @@ async function main(): Promise<void> {
const client = new EventHubClient(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
const producer = client.createProducer({ partitionId: partitionIds[0] });
const events: EventData[] = [];
try {
// NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub
// where the body is a JSON object/array.
Expand All @@ -42,18 +40,37 @@ async function main(): Promise<void> {
// { body: { "message": "Hello World 2" } },
// { body: { "message": "Hello World 3" } }
// ];
console.log("Sending single event...");
const scientist = listOfScientists[0];
producer.send({ body: `${scientist.firstName} ${scientist.name}` });

console.log("Sending multiple events...");
const events: EventData[] = [];
for (let index = 0; index < listOfScientists.length; index++) {
const scientist = listOfScientists[index];
events.push({ body: `${scientist.firstName} ${scientist.name}` });
}
console.log("Sending batch events...");

await producer.send(events);

// Below variation of send will be available in the upcoming release
// console.log("Creating and sending a batch of events...");
// const eventDatabatch = await producer.createBatch();
// for (let index = 0; index < listOfScientists.length; index++) {
// const scientist = listOfScientists[index];
// const isAdded = eventDatabatch.tryAdd({ body: `${scientist.firstName} ${scientist.name}` });
// if (!isAdded) {
// console.log(`Unable to add event ${index} to the batch`);
// break;
// }
// }
// await producer.send(eventDatabatch);

await producer.close();
} finally {
await client.close();
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
113 changes: 113 additions & 0 deletions sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { EventData, toAmqpMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { AmqpMessage } from "@azure/core-amqp";
import { message } from "rhea-promise";

/**
* A class representing a batch of events which can be passed to the `send` method of a `EventConsumer` instance.
* This batch is ensured to be under the maximum message size supported by Azure Event Hubs service.
*
* Use the `tryAdd` function on the EventDataBatch to add events in a batch.
* @class
*/
export class EventDataBatch {
/**
* @property Describes the amqp connection context for the Client.
*/
private _context: ConnectionContext;
/**
* @property A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
private _partitionKey?: string;
/**
* @property The maximum size allowed for the batch.
*/
private readonly _maxSizeInBytes: number;
/**
* @property Current size of the batch in bytes.
*/
private _size: number;
/**
* @property Encoded amqp messages.
*/
private _encodedMessages: Buffer[] = [];
/**
* @property Encoded batch message.
*/
private _batchMessage: Buffer | undefined;

/**
* @constructor
* @internal
* @ignore
*/
constructor(context: ConnectionContext, maxSizeInBytes: number, partitionKey?: string) {
this._context = context;
this._maxSizeInBytes = maxSizeInBytes;
this._partitionKey = partitionKey;
this._size = 0;
}

/**
* @property The partitionKey set during `EventDataBatch` creation. This value is hashed to produce a partition assignment when the consumer is created without a `partitionId`
* @readonly
*/
get partitionKey(): string | undefined {
return this._partitionKey;
}

/**
* @property Size of a batch of events.
* @readonly
*/
get size(): number {
return this._size;
}

/**
* @property Encoded batch message.
* @readonly
*/
get batchMessage(): Buffer | undefined {
return this._batchMessage;
}

/**
* Tries to add an event data to the batch if permitted by the batch's size limit.
* @param eventData An individual event data object.
* @returns A boolean value indicating if the event data has been added to the batch or not.
*/
public tryAdd(eventData: EventData): boolean {
// Convert EventData to AmqpMessage.
const amqpMessage = toAmqpMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);

// Encode every amqp message and then convert every encoded message to amqp data section
this._encodedMessages.push(message.encode(amqpMessage));

const batchMessage: AmqpMessage = {
body: message.data_sections(this._encodedMessages)
};

if (amqpMessage.message_annotations) {
batchMessage.message_annotations = amqpMessage.message_annotations;
}

const encodedBatchMessage = message.encode(batchMessage);
const currentSize = encodedBatchMessage.length;

// this._batchMessage will be used for final send operation
if (currentSize > this._maxSizeInBytes) {
this._encodedMessages.pop();
return false;
}
this._batchMessage = encodedBatchMessage;
this._size = currentSize;
return true;
}
}
18 changes: 18 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ export interface SendOptions {
abortSignal?: AbortSignalLike;
}

/**
* The set of options to configure the createBatch operation on the `EventProducer`.
*/
export interface BatchOptions {
/**
* @property
* A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
partitionKey?: string;
/**
* @property
* The maximum size allowed for the batch.
*/
maxMessageSizeInBytes?: number;
}

/**
* The set of options to configure the behavior of an `EventHubConsumer`.
* These can be specified when creating the consumer using the `createConsumer` method.
Expand Down

0 comments on commit e1e7386

Please sign in to comment.