-
Notifications
You must be signed in to change notification settings - Fork 6
/
CommandGateway.kt
46 lines (40 loc) · 2.32 KB
/
CommandGateway.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.cultureamp.eventsourcing
interface CommandGateway<M : EventMetadata> {
companion object {
operator fun <M : EventMetadata> invoke(eventStore: EventStore<M>, vararg routes: Route<*, *, M>) = EventStoreCommandGateway(eventStore, *routes)
}
fun dispatch(command: Command, metadata: M, retries: Int = 5): Either<CommandError, SuccessStatus>
}
class EventStoreCommandGateway<M : EventMetadata>(private val eventStore: EventStore<M>, private vararg val routes: Route<*, *, M>) : CommandGateway<M> {
override tailrec fun dispatch(command: Command, metadata: M, retries: Int): Either<CommandError, SuccessStatus> {
val result = createOrUpdate(command, metadata)
return if (result is Left && result.error is RetriableError && retries > 0) {
Thread.sleep(500L)
dispatch(command, metadata, retries - 1)
} else {
result
}
}
private fun createOrUpdate(command: Command, metadata: M): Either<CommandError, SuccessStatus> {
val constructor = constructorFor(command) ?: return Left(NoConstructorForCommand)
val events = eventStore.eventsFor(command.aggregateId)
return if (events.isEmpty()) when (command) {
is CreationCommand -> constructor.create(command, metadata, eventStore).map { Created }
else -> Left(AggregateNotFound)
} else when (command) {
is UpdateCommand -> constructor.update(command, metadata, events, eventStore).map { Updated }
else -> Left(AggregateAlreadyExists)
}
}
@Suppress("UNCHECKED_CAST")
private fun constructorFor(command: Command): AggregateConstructor<CreationCommand, CreationEvent, DomainError, UpdateCommand, UpdateEvent, M, Aggregate<UpdateCommand, UpdateEvent, DomainError, M, *>>? {
val route = routes.find { it.creationCommandClass.isInstance(command) || it.updateCommandClass.isInstance(command) }
return route?.aggregateConstructor as AggregateConstructor<CreationCommand, CreationEvent, DomainError, UpdateCommand, UpdateEvent, M, Aggregate<UpdateCommand, UpdateEvent, DomainError, M, *>>?
}
}
sealed class SuccessStatus
object Created : SuccessStatus()
object Updated : SuccessStatus()
object NoConstructorForCommand : CommandError
object AggregateAlreadyExists : CommandError
object AggregateNotFound : CommandError