Skip to content

Commit

Permalink
Merge pull request #202 from tomorrow-one/backport_PR183_to_main_1.x
Browse files Browse the repository at this point in the history
Backport PR #183 to main_1.x
  • Loading branch information
magro committed Sep 20, 2023
2 parents 4cf8a7f + 1aaa698 commit ea2adde
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 154 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/gradle-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ jobs:
- name: Validate gradle wrapper
uses: gradle/wrapper-validation-action@v1
- name: Build
uses: gradle/gradle-build-action@v2.3.3
uses: gradle/gradle-build-action@v2.7.1
with:
arguments: build
arguments: build -Porg.gradle.jvmargs=-Xmx2048m

publish:
needs: [ build ]
Expand All @@ -49,7 +49,7 @@ jobs:
distribution: 'temurin'
cache: 'gradle'
- name: Publish
uses: gradle/gradle-build-action@v2.3.3
uses: gradle/gradle-build-action@v2.7.1
env:
# variables used by build.gradle.kts for signing / publishing (without 'ORG_GRADLE_PROJECT_' prefix)
ORG_GRADLE_PROJECT_signingKeyId: ${{ secrets.OSSRH_GPG_SECRET_KEY_ID }}
Expand Down
2 changes: 1 addition & 1 deletion LICENSE-header.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2022 Tomorrow GmbH @ https://tomorrow.one
Copyright ${year} Tomorrow GmbH @ https://tomorrow.one

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import java.util.*

project(":commons").version = "1.0.3-SNAPSHOT"
project(":outbox-kafka-spring").version = "1.1.7-SNAPSHOT"
Expand Down Expand Up @@ -51,6 +52,8 @@ subprojects {
"one/tomorrow/transactionaloutbox/reactive/test/Sample.java"
)) // java sources generated from proto messages
include("**/*.java")
ext["year"] = Calendar.getInstance().get(Calendar.YEAR)
skipExistingHeaders = true
}

val subproject = this
Expand Down
4 changes: 3 additions & 1 deletion outbox-kafka-spring/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
val kafkaVersion = "3.3.1"
val springKafkaVersion = "2.9.2"
val log4jVersion = "2.19.0"
val testcontainersVersion = "1.18.3"

implementation("org.springframework:spring-context:$springVersion")
implementation("org.springframework:spring-orm:$springVersion")
Expand All @@ -29,7 +30,8 @@ dependencies {
testImplementation("junit:junit:4.13.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.9.1")
testImplementation("org.springframework:spring-test:$springVersion")
testImplementation("org.testcontainers:postgresql:1.17.6")
testImplementation("org.testcontainers:postgresql:$testcontainersVersion")
testImplementation("org.testcontainers:toxiproxy:$testcontainersVersion")
testImplementation("org.postgresql:postgresql:42.5.1")
testImplementation("org.flywaydb:flyway-core:9.15.2")
testImplementation("org.flywaydb.flyway-test-extensions:flyway-spring-test:7.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,153 +43,163 @@

public class OutboxProcessor {

@FunctionalInterface
public interface KafkaProducerFactory {
KafkaProducer<String, byte[]> createKafkaProducer();
}

private static final int BATCH_SIZE = 100;

private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);

private final OutboxLockService lockService;
private final String lockOwnerId;
private final OutboxRepository repository;
private final KafkaProducerFactory producerFactory;
private final Duration processingInterval;
private final ScheduledExecutorService executor;
private final byte[] eventSource;
private KafkaProducer<String, byte[]> producer;
private boolean active;
private Instant lastLockAckquisitionAttempt;

private ScheduledFuture<?> schedule;

public OutboxProcessor(
OutboxRepository repository,
KafkaProducerFactory producerFactory,
Duration processingInterval,
Duration lockTimeout,
String lockOwnerId,
String eventSource,
AutowireCapableBeanFactory beanFactory) {
logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}",
lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory);
this.repository = repository;
this.processingInterval = processingInterval;
OutboxLockRepository lockRepository = beanFactory.getBean(OutboxLockRepository.class);
OutboxLockService rawLockService = new OutboxLockService(lockRepository, lockTimeout);
this.lockService = (OutboxLockService) beanFactory.applyBeanPostProcessorsAfterInitialization(rawLockService, "OutboxLockService");
this.lockOwnerId = lockOwnerId;
this.eventSource = eventSource.getBytes();
this.producerFactory = producerFactory;
producer = producerFactory.createKafkaProducer();

executor = Executors.newSingleThreadScheduledExecutor();

tryLockAcquisition();
}

private void scheduleProcessing() {
if (executor.isShutdown())
logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", lockOwnerId);
else
schedule = executor.schedule(this::processOutboxWithLock, processingInterval.toMillis(), MILLISECONDS);
}

private void scheduleTryLockAcquisition() {
if (executor.isShutdown())
logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", lockOwnerId);
else
schedule = executor.schedule(this::tryLockAcquisition, lockService.getLockTimeout().toMillis(), MILLISECONDS);
}

@PreDestroy
public void close() {
logger.info("Stopping OutboxProcessor.");
if (schedule != null)
schedule.cancel(false);
executor.shutdown();
producer.close();
if (active)
lockService.releaseLock(lockOwnerId);
}

private void tryLockAcquisition() {
try {
boolean originalActive = active;
logger.debug("{} trying to acquire outbox lock", lockOwnerId);
active = lockService.acquireOrRefreshLock(lockOwnerId);
lastLockAckquisitionAttempt = now();
if (active) {
if (originalActive)
logger.debug("{} acquired outbox lock, starting to process outbox", lockOwnerId);
else
logger.info("{} acquired outbox lock, starting to process outbox", lockOwnerId);

processOutboxWithLock();
}
else
scheduleTryLockAcquisition();
} catch (Exception e) {
logger.warn("Failed trying lock acquisition or processing the outbox, trying again in {}", lockService.getLockTimeout(), e);
scheduleTryLockAcquisition();
}
}

private void processOutboxWithLock() {
if (!active)
throw new IllegalStateException("processOutbox must only be run when in active state");

if (now().isAfter(lastLockAckquisitionAttempt.plus(lockService.getLockTimeout().dividedBy(2)))) {
tryLockAcquisition();
return;
}

boolean couldRunWithLock = lockService.runWithLock(lockOwnerId, () -> {
try {
processOutbox();
} catch (Throwable e) {
logger.warn("Recreating producer, due to failure while processing outbox.", e);
producer.close();
producer = producerFactory.createKafkaProducer();
}
});
if (couldRunWithLock) {
scheduleProcessing();
} else {
logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", lockService.getLockTimeout().toMillis());
active = false;
scheduleTryLockAcquisition();
}

}

private void processOutbox() throws ExecutionException, InterruptedException {
List<OutboxRecord> records = repository.getUnprocessedRecords(BATCH_SIZE);
for (OutboxRecord outboxRecord : records) {
ProducerRecord<String, byte[]> producerRecord = toProducerRecord(outboxRecord);
Future<RecordMetadata> result = producer.send(producerRecord);
result.get();
logger.info("Sent record to kafka: {}", outboxRecord);
outboxRecord.setProcessed(now());
repository.update(outboxRecord);
}
}

private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(
outboxRecord.getTopic(),
outboxRecord.getKey(),
outboxRecord.getValue()
);
if (outboxRecord.getHeaders() != null) {
outboxRecord.getHeaders().forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
}
producerRecord.headers().add(HEADERS_SEQUENCE_NAME, Longs.toByteArray(outboxRecord.getId()));
producerRecord.headers().add(HEADERS_SOURCE_NAME, eventSource);
return producerRecord;
}
@FunctionalInterface
public interface KafkaProducerFactory {
KafkaProducer<String, byte[]> createKafkaProducer();
}

private static final int BATCH_SIZE = 100;

private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);

private final OutboxLockService lockService;
private final String lockOwnerId;
private final OutboxRepository repository;
private final KafkaProducerFactory producerFactory;
private final Duration processingInterval;
private final ScheduledExecutorService executor;
private final byte[] eventSource;
private KafkaProducer<String, byte[]> producer;
private boolean active;
private Instant lastLockAckquisitionAttempt;

private ScheduledFuture<?> schedule;

public OutboxProcessor(
OutboxRepository repository,
KafkaProducerFactory producerFactory,
Duration processingInterval,
Duration lockTimeout,
String lockOwnerId,
String eventSource,
AutowireCapableBeanFactory beanFactory) {
logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}",
lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory);
this.repository = repository;
this.processingInterval = processingInterval;
OutboxLockRepository lockRepository = beanFactory.getBean(OutboxLockRepository.class);
OutboxLockService rawLockService = new OutboxLockService(lockRepository, lockTimeout);
this.lockService = (OutboxLockService) beanFactory.applyBeanPostProcessorsAfterInitialization(rawLockService, "OutboxLockService");
this.lockOwnerId = lockOwnerId;
this.eventSource = eventSource.getBytes();
this.producerFactory = producerFactory;
producer = producerFactory.createKafkaProducer();

executor = Executors.newSingleThreadScheduledExecutor();

tryLockAcquisition();
}

private void scheduleProcessing() {
if (executor.isShutdown())
logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", lockOwnerId);
else
schedule = executor.schedule(this::processOutboxWithLock, processingInterval.toMillis(), MILLISECONDS);
}

private void scheduleTryLockAcquisition() {
if (executor.isShutdown())
logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", lockOwnerId);
else
schedule = executor.schedule(this::tryLockAcquisition, lockService.getLockTimeout().toMillis(), MILLISECONDS);
}

@PreDestroy
public void close() {
logger.info("Stopping OutboxProcessor.");
if (schedule != null)
schedule.cancel(false);
executor.shutdown();
producer.close();
if (active)
lockService.releaseLock(lockOwnerId);
}

private void tryLockAcquisition() {
try {
boolean originalActive = active;
logger.debug("{} trying to acquire outbox lock", lockOwnerId);
active = lockService.acquireOrRefreshLock(lockOwnerId);
lastLockAckquisitionAttempt = now();
if (active) {
if (originalActive)
logger.debug("{} acquired outbox lock, starting to process outbox", lockOwnerId);
else
logger.info("{} acquired outbox lock, starting to process outbox", lockOwnerId);

processOutboxWithLock();
}
else
scheduleTryLockAcquisition();
} catch (Exception e) {
logger.warn("Failed trying lock acquisition or processing the outbox, trying again in {}", lockService.getLockTimeout(), e);
scheduleTryLockAcquisition();
}
}

private void processOutboxWithLock() {
if (!active)
throw new IllegalStateException("processOutbox must only be run when in active state");

if (now().isAfter(lastLockAckquisitionAttempt.plus(lockService.getLockTimeout().dividedBy(2)))) {
tryLockAcquisition();
return;
}

boolean couldRunWithLock = tryProcessOutbox();
if (couldRunWithLock) {
scheduleProcessing();
} else {
logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", lockService.getLockTimeout().toMillis());
active = false;
scheduleTryLockAcquisition();
}

}

private boolean tryProcessOutbox() {
boolean couldRunWithLock = false;
try {
couldRunWithLock = lockService.runWithLock(lockOwnerId, () -> {
try {
processOutbox();
} catch (Throwable e) {
logger.warn("Recreating producer, due to failure while processing outbox.", e);
producer.close();
producer = producerFactory.createKafkaProducer();
}
});
} catch (Exception e) {
logger.warn("Caught exception when trying to run with lock.", e);
}
return couldRunWithLock;
}

private void processOutbox() throws ExecutionException, InterruptedException {
List<OutboxRecord> records = repository.getUnprocessedRecords(BATCH_SIZE);
for (OutboxRecord outboxRecord : records) {
ProducerRecord<String, byte[]> producerRecord = toProducerRecord(outboxRecord);
Future<RecordMetadata> result = producer.send(producerRecord);
result.get();
logger.info("Sent record to kafka: {}", outboxRecord);
outboxRecord.setProcessed(now());
repository.update(outboxRecord);
}
}

private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(
outboxRecord.getTopic(),
outboxRecord.getKey(),
outboxRecord.getValue()
);
if (outboxRecord.getHeaders() != null) {
outboxRecord.getHeaders().forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
}
producerRecord.headers().add(HEADERS_SEQUENCE_NAME, Longs.toByteArray(outboxRecord.getId()));
producerRecord.headers().add(HEADERS_SOURCE_NAME, eventSource);
return producerRecord;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class IntegrationTestConfig {

public static final Duration DEFAULT_OUTBOX_LOCK_TIMEOUT = Duration.ofMillis(200);
public static ProxiedPostgreSQLContainer postgresqlContainer = ProxiedPostgreSQLContainer.startProxiedPostgres();

@Bean
public LocalSessionFactoryBean sessionFactory(DataSource dataSource) {
Expand Down Expand Up @@ -64,8 +65,10 @@ private Properties getHibernateProperties() {
public DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();

dataSource.setDriverClassName("org.testcontainers.jdbc.ContainerDatabaseDriver");
dataSource.setUrl("jdbc:tc:postgresql:13.7://localhost/test");
dataSource.setDriverClassName(postgresqlContainer.getDriverClassName());
dataSource.setUrl(postgresqlContainer.getJdbcUrl());
dataSource.setUsername(postgresqlContainer.getUsername());
dataSource.setPassword(postgresqlContainer.getPassword());

return dataSource;
}
Expand Down

0 comments on commit ea2adde

Please sign in to comment.