Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve batch append throughput at high loads #234

Merged
merged 1 commit into from
Nov 9, 2021
Merged

Conversation

George-Payne
Copy link
Member

Current implementation of batch append drops off when more than 4000 batches are queued up, due to each batch having an event listener on the stream. This can be mitigated by having a single event listener and a Map of promises, and looking up the promise to resolve or reject from the Map (This PR).

Performance comparison:
chart
(Please note, classic append begins to fail at around 8000 events due to #167, so counts are ommited)

Testing notes / explanation:

  • An array of random binary events (of length events) is created.
  • Timer is started
  • Each event is individually sent to a random stream.
  • Wait for all events to have been acknowledged by the server
  • Timer is stopped
  • Events per second (eps) is calculated by (1000 * events) / duration
  • Each burst size is run three times
Code

throughput.ts:

import { writeFile } from "fs/promises";
import { join } from "path";
import { burst } from "./burstl";

export interface Init {
  connectionString: string;
  batch: boolean;
}

const settle = () => new Promise<void>((r) => setTimeout(() => r(), 5000));

export async function throughputHandler({ connectionString, batch }: Init) {
  const results = [];

  for (let n = 1; n <= 20; n++) {
    const count = n * 1000;

    for (let a = 0; a < 3; a++) {
      try {
        await settle();
        const result = await wrflHandler({
          connectionString,
          debug: false,
          log: false,
          events: count,
          batch,
        });

        console.log(
          `${count} events: ${result.eps}/s ${
            result.failure > 0 ? `FAILED: ${result.failures.join(" ")}` : ""
          }`
        );

        results.push(result);
      } catch (error) {
        console.log(`${count} events: ${error}`);
      }
    }
  }

  await writeFile(
    join(process.cwd(), `throughput-${Date.now()}.json`),
    JSON.stringify(results)
  );

  console.log("DONE");
  process.exit();
}

burst.ts:

import { performance, PerformanceObserver } from "perf_hooks";
import { randomBytes } from "crypto";

import {
  binaryEvent,
  EventData,
  EventStoreDBClient,
  jsonEvent,
} from "@eventstore/db-client";
import * as debug from "debug";

export interface Init {
  connectionString: string;
  events: number;
  debug: boolean;
  log: boolean;
  batch: boolean;
}

export interface Result {
  events: number;
  duration: number;
  eps: number;
  success: number;
  failure: number;
  failures: string[];
}

export function burst({
  connectionString,
  events,
  debug: d,
  log,
  batch,
}: Init) {
  return new Promise<Result>(async (resolve) => {
    let success = 0;
    let failure = 0;
    const failures = new Map<string, number>();

    const perfObserver = new PerformanceObserver((items) => {
      items.getEntries().forEach(({ name, duration }) => {
        const result = Object.freeze({
          events,
          duration,
          eps: Math.round((1000.0 * events) / duration),
          success,
          failure,
          failures: Array.from(failures.keys()),
        });

        if (log) {
          console.log(
            `${name} ${events} WRITES IN ${duration}ms (${
              (1000.0 * events) / duration
            }/s)`
          );

          console.log(`SUCCESS: ${success} FAILURE: ${failure}`);
          for (const [err, count] of failures) {
            console.log(`${count}: ${err}`);
          }
        }

        resolve(result);
      });
    });
    perfObserver.observe({ entryTypes: ["measure"], buffered: true });

    if (d) debug.enable("esdb:*");
    if (log) console.log(`Writing ${events} events to random streams`);

    const eventsToWrite = eventGen(events);

    const client = EventStoreDBClient.connectionString(connectionString);
    await client.appendToStream(
      "a stream",
      jsonEvent({
        type: "connect",
        data: "hi",
      })
    );

    if (d) debug.disable();
    if (log) console.log(`Connected and starting`);

    performance.mark("writes-start");

    const promises = [];

    for (const [streamName, event, l] of eventsToWrite) {
      promises.push(
        client
          .appendToStream(streamName, event, {
            credentials: batch
              ? undefined
              : { username: "admin", password: "changeit" },
          })
          .then(() => {
            success++;
            if (d && l) console.log(`Reached event ${success + failure}`);
          })
          .catch((error) => {
            failure++;
            const err = error.message ?? error.toString();
            failures.set(err, failures.get(err) ?? 0 + 1);
          })
      );
    }

    await Promise.allSettled(promises);

    performance.mark("writes-end");
    performance.measure(`TOTALS: `, "writes-start", "writes-end");
  });
}

const type = "event-test";

type EventTuple = [string, EventData, boolean];
function eventGen(count: number) {
  const events: EventTuple[] = [];
  for (let i = 0; i < count; i++) {
    events.push([
      randomBytes(20).toString("hex"),
      binaryEvent({
        type,
        data: randomBytes(256),
        metadata: randomBytes(256),
      }),
      i % 500 === 0,
    ]);
  }
  return events;
}

@YoEight YoEight merged commit a30679b into master Nov 9, 2021
@YoEight YoEight deleted the batch-throughput branch November 9, 2021 13:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants