Skip to content

Commit

Permalink
fixup! feat: initial projection implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkazlauskas committed Nov 14, 2022
1 parent 1f9fa7b commit 03cfe94
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 33 deletions.
3 changes: 2 additions & 1 deletion packages/projection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
"dependencies": {
"@cardano-sdk/core": "^0.6.0",
"@cardano-sdk/util-rxjs": "^0.4.2",
"rxjs": "^7.4.0"
"rxjs": "^7.4.0",
"ts-custom-error": "^3.2.0"
},
"devDependencies": {
"@cardano-sdk/ogmios": "^0.6.0",
Expand Down
73 changes: 45 additions & 28 deletions packages/projection/src/operators/withRolledBackEvents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ChainSyncEventType, ProjectorOperator, RollBackwardEvent, RollForwardEvent } from '../types';
import { CustomError } from 'ts-custom-error';
import { EMPTY, Observable, map, scan, toArray } from 'rxjs';
import { WithStabilityWindow } from './withStabilityWindow';
import { blockingWithLatestFrom } from '@cardano-sdk/util-rxjs';
Expand All @@ -15,6 +16,46 @@ type WithRolledBackEventsScan<TRollForwardEvent> = {
evt: TRollForwardEvent | (RollBackwardEvent & WithRolledBackEvents<TRollForwardEvent>);
};

export class InsufficientEventCacheError extends CustomError {}

const rollForward = <ExtraRollForwardProps extends WithStabilityWindow>(
evt: RollForwardEvent<ExtraRollForwardProps>,
eventCache: RollForwardEvent<ExtraRollForwardProps>[]
) => {
// clear blocks that are past stability window
const slotThreshold = evt.tip.slot - evt.stabilityWindowSlotsCount;
while (eventCache.length > 0 && eventCache[0].block.header.slot < slotThreshold) eventCache.shift();
// add current block to cache and return the event unchanged
eventCache.push(evt);
return { eventCache, evt };
};

const rollBackward = <ExtraRollForwardProps, ExtraRollBackwardProps>(
evt: RollBackwardEvent<ExtraRollBackwardProps>,
eventCache: RollForwardEvent<ExtraRollForwardProps>[]
) => {
const rollbackTo = evt.tip;
if (rollbackTo === 'origin') {
return {
eventCache: [],
evt: {
...evt,
rolledBackEvents: eventCache.reverse()
}
};
}
const rolledBackEvents = [] as RollForwardEvent<ExtraRollForwardProps>[];
while (eventCache.length > 0 && eventCache[eventCache.length - 1].block.header.hash !== rollbackTo.hash)
rolledBackEvents.push(eventCache.pop()!);
if (
rolledBackEvents.length > 0 &&
rolledBackEvents.length < rolledBackEvents[0].block.header.blockNo - rollbackTo.blockNo
) {
throw new InsufficientEventCacheError();
}
return { eventCache, evt: { ...evt, rolledBackEvents } };
};

/**
* Adds `rolledBackEvents` to RollBackward events.
* `rolledBackEvents` are in descending order (starting from tip going down to origin).
Expand Down Expand Up @@ -43,34 +84,10 @@ export const withRolledBackEvents =
): WithRolledBackEventsScan<RollForwardEvent<ExtraRollForwardPropsIn>> => {
eventCache ||= initialEvtCache;
switch (evt.eventType) {
case ChainSyncEventType.RollForward: {
// clear blocks that are past stability window
const slotThreshold = evt.tip.slot - evt.stabilityWindowSlotsCount;
while (eventCache.length > 0 && eventCache[0].block.header.slot < slotThreshold) eventCache.shift();
// add current block to cache and return the event unchanged
eventCache.push(evt);
return { eventCache, evt };
}
case ChainSyncEventType.RollBackward: {
const rollbackTo = evt.tip;
if (rollbackTo === 'origin') {
// Review: consider checking if `eventCache` isn't missing some blocks due to
// not syncing from genesis AND not providing sufficient evtCache$.
// Could be nice to have, but also could be redundant
// if we test our implementation of creating evtCache$
return {
eventCache: [],
evt: {
...evt,
rolledBackEvents: eventCache.reverse()
}
};
}
const rolledBackEvents = [] as RollForwardEvent<ExtraRollForwardPropsIn>[];
while (eventCache.length > 0 && eventCache[eventCache.length - 1].block.header.hash !== rollbackTo.hash)
rolledBackEvents.push(eventCache.pop()!);
return { eventCache, evt: { ...evt, rolledBackEvents } };
}
case ChainSyncEventType.RollForward:
return rollForward(evt, eventCache);
case ChainSyncEventType.RollBackward:
return rollBackward(evt, eventCache);
}
},
{
Expand Down
30 changes: 26 additions & 4 deletions packages/projection/test/operators/withRolledBackEvents.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Cardano } from '@cardano-sdk/core';
import {
ChainSyncEventType,
InsufficientEventCacheError,
RollBackwardEvent,
RollForwardEvent,
WithStabilityWindow,
Expand All @@ -11,18 +12,18 @@ import { createTestScheduler } from '@cardano-sdk/util-dev';
describe('withRolledBackEvents', () => {
const stabilityWindowSlotsCount = 2;
const blockId = Cardano.BlockId('0000000000000000000000000000000000000000000000000000000000000000');
const rollForwardEvent = (slot: number, hash?: Cardano.BlockId) =>
const rollForwardEvent = (slot: number, hash?: Cardano.BlockId, blockNo = slot) =>
({
block: { header: { hash } },
block: { header: { blockNo, hash, slot } },
eventType: ChainSyncEventType.RollForward,
stabilityWindowSlotsCount,
tip: { slot }
} as RollForwardEvent<WithStabilityWindow>);
const rollBackwardEvent = (slot: number, hash: Cardano.BlockId) =>
const rollBackwardEvent = (slot: number, hash: Cardano.BlockId, blockNo = slot) =>
({
eventType: ChainSyncEventType.RollBackward,
stabilityWindowSlotsCount,
tip: { hash, slot }
tip: { blockNo, hash, slot }
} as RollBackwardEvent<WithStabilityWindow>);

describe('without evtCache$', () => {
Expand Down Expand Up @@ -72,5 +73,26 @@ describe('withRolledBackEvents', () => {
expectSubscriptions(source$.subscriptions).toBe('^');
});
});

it('errors if evtCache$ doesnt have events within the rollback', () => {
createTestScheduler().run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const evtCache$ = cold('a-b|', {
a: rollForwardEvent(0),
b: rollForwardEvent(1, blockId)
});
const source$ = hot('de', {
d: rollForwardEvent(3),
e: rollBackwardEvent(1, blockId)
});
expectObservable(source$.pipe(withRolledBackEvents(evtCache$))).toBe(
'----(d#)',
{
d: rollForwardEvent(3)
},
new InsufficientEventCacheError()
);
expectSubscriptions(source$.subscriptions).toBe('^---!');
});
});
});
});
1 change: 1 addition & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,7 @@ __metadata:
npm-run-all: ^4.1.5
rxjs: ^7.4.0
shx: ^0.3.3
ts-custom-error: ^3.2.0
ts-jest: ^28.0.7
typescript: ^4.7.4
languageName: unknown
Expand Down

0 comments on commit 03cfe94

Please sign in to comment.