diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 490f930b8..a9096ea02 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.6 + SCALA_VERSION=2.13.8 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 26ef84a4f..df1e20ba1 100644 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.6 + set SCALA_VERSION=2.13.8 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/build.gradle b/build.gradle index e1f773075..dd3f9cb2c 100644 --- a/build.gradle +++ b/build.gradle @@ -1619,6 +1619,8 @@ project(':storage') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':core') testImplementation project(':core').sourceSets.test.output + testImplementation project(':server-common') + testImplementation project(':server-common').sourceSets.test.output testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation libs.bcpkix diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index bd80981d8..bc8f32491 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -107,7 +107,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe static final int PARTITION_LEADER_EPOCH_LENGTH = 4; static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH; static final int MAGIC_LENGTH = 1; - static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; + public static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index f7c18d74a..e716efc09 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -26,28 +26,36 @@ import org.apache.kafka.connect.source.SourceConnectorContext; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; -import org.easymock.Capture; import org.apache.kafka.connect.util.Callback; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import java.util.HashMap; import java.util.Map; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - -@RunWith(EasyMockRunner.class) -public class WorkerConnectorTest extends EasyMockSupport { +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class WorkerConnectorTest { private static final String VERSION = "1.1"; public static final String CONNECTOR = "connector"; @@ -60,15 +68,15 @@ public class WorkerConnectorTest extends EasyMockSupport { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; - @Mock Plugins plugins; - @Mock SourceConnector sourceConnector; - @Mock SinkConnector sinkConnector; - @Mock Connector connector; - @Mock CloseableConnectorContext ctx; - @Mock ConnectorStatus.Listener listener; - @Mock CloseableOffsetStorageReader offsetStorageReader; - @Mock ConnectorOffsetBackingStore offsetStore; - @Mock ClassLoader classLoader; + @Mock private Plugins plugins; + @Mock private SourceConnector sourceConnector; + @Mock private SinkConnector sinkConnector; + @Mock private CloseableConnectorContext ctx; + @Mock private ConnectorStatus.Listener listener; + @Mock private CloseableOffsetStorageReader offsetStorageReader; + @Mock private ConnectorOffsetBackingStore offsetStore; + @Mock private ClassLoader classLoader; + private Connector connector; @Before public void setup() { @@ -86,31 +94,8 @@ public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -120,7 +105,9 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onFailure(CONNECTOR, exception); + verifyCleanShutdown(false); } @Test @@ -128,36 +115,11 @@ public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - // expect no call to onStartup() after failure - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertFailedMetric(workerConnector); @@ -167,48 +129,22 @@ public void testFailureIsFinalState() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onFailure(CONNECTOR, exception); + // expect no call to onStartup() after failure + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndShutdown() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -219,54 +155,26 @@ public void testStartupAndShutdown() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verifyCleanShutdown(true); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndPause() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); + workerConnector.doTransitionTo(TargetState.STARTED, onStateChange); assertRunningMetric(workerConnector); workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange); @@ -275,52 +183,25 @@ public void testStartupAndPause() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testOnResume() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - offsetStore.start(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - listener.onPause(CONNECTOR); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onResume(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -334,42 +215,25 @@ public void testOnResume() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onPause(CONNECTOR); + verify(connector).start(CONFIG); + verify(listener).onResume(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupPaused() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - // connector never gets started - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -379,45 +243,25 @@ public void testStartupPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + // connector never gets started + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupFailure() { RuntimeException exception = new RuntimeException(); - connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - connector.start(CONFIG); - expectLastCall().andThrow(exception); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).start(CONFIG); - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -427,7 +271,13 @@ public void testStartupFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onFailure(CONNECTOR, exception); + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test @@ -435,42 +285,11 @@ public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall().andThrow(exception); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + doThrow(exception).when(connector).stop(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -481,48 +300,22 @@ public void testShutdownFailure() { workerConnector.doShutdown(); assertFailedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); + verify(listener).onFailure(CONNECTOR, exception); + verifyShutdown(false, true); } @Test public void testTransitionStartedToStarted() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - // expect only one call to onStartup() - listener.onStartup(CONNECTOR); - expectLastCall(); - connector.stop(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall().times(2); - - replayAll(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -536,53 +329,21 @@ public void testTransitionStartedToStarted() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + // expect only one call to onStartup() + verify(listener).onStartup(CONNECTOR); + verifyCleanShutdown(true); + verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testTransitionPausedToPaused() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall().times(2); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -597,28 +358,32 @@ public void testTransitionPausedToPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testFailConnectorThatIsNeitherSourceNorSink() { - connector.version(); - expectLastCall().andReturn(VERSION); - - Capture exceptionCapture = Capture.newInstance(); - listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture)); - expectLastCall(); - - replayAll(); - + connector = mock(Connector.class); + when(connector.version()).thenReturn(VERSION); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); + + verify(connector).version(); + ArgumentCaptor exceptionCapture = ArgumentCaptor.forClass(Throwable.class); + verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture()); Throwable e = exceptionCapture.getValue(); assertTrue(e instanceof ConnectException); assertTrue(e.getMessage().contains("must be a subclass of")); - - verifyAll(); } protected void assertFailedMetric(WorkerConnector workerConnector) { @@ -672,6 +437,39 @@ protected void assertInitializedMetric(WorkerConnector workerConnector, String e assertEquals(VERSION, version); } + @SuppressWarnings("unchecked") + private Callback mockCallback() { + return mock(Callback.class); + } + + private void verifyInitialize() { + verify(connector).version(); + if (connector instanceof SourceConnector) { + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + } else { + verify(connector).initialize(any(SinkConnectorContext.class)); + } + } + + private void verifyCleanShutdown(boolean started) { + verifyShutdown(true, started); + } + + private void verifyShutdown(boolean clean, boolean started) { + verify(ctx).close(); + if (connector instanceof SourceConnector) { + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + } + if (clean) { + verify(listener).onShutdown(CONNECTOR); + } + if (started) { + verify(connector).stop(); + } + } + private static abstract class TestConnector extends Connector { } } diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala index 503ce7d2b..f8dccd17d 100644 --- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala +++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala @@ -78,7 +78,8 @@ class RaftClusterSnapshotTest { raftManager.replicatedLog.latestSnapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), - 1 + 1, + true ) ) { snapshot => // Check that the snapshot is non-empty diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 0cd41b3dc..6c6b975ee 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -70,7 +70,7 @@ versions += [ jacksonDatabind: "2.13.3", jacoco: "0.8.7", javassist: "3.27.0-GA", - jetty: "9.4.44.v20210927", + jetty: "9.4.48.v20220622", jersey: "2.34", jline: "3.21.0", jmh: "1.35", diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 5e395cebc..e83928956 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -739,7 +739,8 @@ private SnapshotReader createSnapshotReader(RawSnapshotRea reader, new MetadataRecordSerde(), BufferSupplier.create(), - Integer.MAX_VALUE + Integer.MAX_VALUE, + true ); } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index c8e39ae32..e24d86bd8 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -496,7 +496,8 @@ private void scheduleLogCheck() { snapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), - Integer.MAX_VALUE + Integer.MAX_VALUE, + true ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 53372728a..cac7a8a3c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -333,7 +333,12 @@ private void updateListenersProgress(long highWatermark) { private Optional> latestSnapshot() { return log.latestSnapshot().map(reader -> - RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES) + RecordsSnapshotReader.of(reader, + serde, + BufferSupplier.create(), + MAX_BATCH_SIZE_BYTES, + true /* Validate batch CRC*/ + ) ); } @@ -2519,7 +2524,8 @@ private void fireHandleCommit(long baseOffset, Records records) { serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, - this + this, + true /* Validate batch CRC*/ ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java index e95206100..61819a9dc 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java @@ -100,11 +100,12 @@ public static RecordsBatchReader of( RecordSerde serde, BufferSupplier bufferSupplier, int maxBatchSize, - CloseListener> closeListener + CloseListener> closeListener, + boolean doCrcValidation ) { return new RecordsBatchReader<>( baseOffset, - new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize), + new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize, doCrcValidation), closeListener ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index 866f541fb..ff415aa72 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -41,6 +41,9 @@ public final class RecordsIterator implements Iterator>, AutoCloseab private final RecordSerde serde; private final BufferSupplier bufferSupplier; private final int batchSize; + // Setting to true will make the RecordsIterator perform a CRC Validation + // on the batch header when iterating over them + private final boolean doCrcValidation; private Iterator nextBatches = Collections.emptyIterator(); private Optional> nextBatch = Optional.empty(); @@ -54,12 +57,14 @@ public RecordsIterator( Records records, RecordSerde serde, BufferSupplier bufferSupplier, - int batchSize + int batchSize, + boolean doCrcValidation ) { this.records = records; this.serde = serde; this.bufferSupplier = bufferSupplier; this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC); + this.doCrcValidation = doCrcValidation; } @Override @@ -163,7 +168,6 @@ private Optional> nextBatch() { if (nextBatches.hasNext()) { MutableRecordBatch nextBatch = nextBatches.next(); - // Update the buffer position to reflect the read batch allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes())); @@ -180,6 +184,11 @@ private Optional> nextBatch() { } private Batch readBatch(DefaultRecordBatch batch) { + if (doCrcValidation) { + // Perform a CRC validity check on this batch + batch.ensureValid(); + } + final Batch result; if (batch.isControlBatch()) { result = Batch.control( diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 89ad26322..92b695146 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -104,11 +104,12 @@ public static RecordsSnapshotReader of( RawSnapshotReader snapshot, RecordSerde serde, BufferSupplier bufferSupplier, - int maxBatchSize + int maxBatchSize, + boolean doCrcValidation ) { return new RecordsSnapshotReader<>( snapshot.snapshotId(), - new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize) + new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize, doCrcValidation) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 4f79dc18c..a6117a33c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -1112,7 +1112,7 @@ private void assertCommittedData(RaftNode node) { startOffset.set(snapshotId.offset); try (SnapshotReader snapshot = - RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) { + RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true)) { // Expect only one batch with only one record assertTrue(snapshot.hasNext()); Batch batch = snapshot.next(); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java index 6fe540711..ae8b1dfb8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java @@ -100,7 +100,8 @@ private void testBatchReader( serde, bufferSupplier, MAX_BATCH_BYTES, - closeListener + closeListener, + true ); for (TestBatch batch : expectedBatches) { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 7d9848931..9dfbfd62f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -30,7 +30,9 @@ import java.util.stream.Stream; import net.jqwik.api.ForAll; import net.jqwik.api.Property; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; @@ -42,6 +44,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -60,7 +63,7 @@ private static Stream emptyRecords() throws IOException { @ParameterizedTest @MethodSource("emptyRecords") void testEmptyRecords(Records records) { - testIterator(Collections.emptyList(), records); + testIterator(Collections.emptyList(), records, true); } @Property @@ -71,7 +74,7 @@ public void testMemoryRecords( List> batches = createBatches(seed); MemoryRecords memRecords = buildRecords(compressionType, batches); - testIterator(batches, memRecords); + testIterator(batches, memRecords, true); } @Property @@ -85,18 +88,58 @@ public void testFileRecords( FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); fileRecords.append(memRecords); - testIterator(batches, fileRecords); + testIterator(batches, fileRecords, true); + fileRecords.close(); + } + + @Property + public void testCrcValidation( + @ForAll CompressionType compressionType, + @ForAll long seed + ) throws IOException { + List> batches = createBatches(seed); + MemoryRecords memRecords = buildRecords(compressionType, batches); + // Read the Batch CRC for the first batch from the buffer + ByteBuffer readBuf = memRecords.buffer(); + readBuf.position(DefaultRecordBatch.CRC_OFFSET); + int actualCrc = readBuf.getInt(); + // Corrupt the CRC on the first batch + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1); + + assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true)); + + FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); + fileRecords.append(memRecords); + assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true)); + + // Verify check does not trigger when doCrcValidation is false + assertDoesNotThrow(() -> testIterator(batches, memRecords, false)); + assertDoesNotThrow(() -> testIterator(batches, fileRecords, false)); + + // Fix the corruption + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc); + + // Verify check does not trigger when the corruption is fixed + assertDoesNotThrow(() -> testIterator(batches, memRecords, true)); + FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile()); + moreFileRecords.append(memRecords); + assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true)); + + fileRecords.close(); + moreFileRecords.close(); } private void testIterator( List> expectedBatches, - Records records + Records records, + boolean validateCrc ) { Set allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>()); RecordsIterator iterator = createIterator( records, - mockBufferSupplier(allocatedBuffers) + mockBufferSupplier(allocatedBuffers), + validateCrc ); for (TestBatch batch : expectedBatches) { @@ -111,8 +154,12 @@ private void testIterator( assertEquals(Collections.emptySet(), allocatedBuffers); } - static RecordsIterator createIterator(Records records, BufferSupplier bufferSupplier) { - return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC); + static RecordsIterator createIterator( + Records records, + BufferSupplier bufferSupplier, + boolean validateCrc + ) { + return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC, validateCrc); } static BufferSupplier mockBufferSupplier(Set buffers) { diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index 05d1929f2..cd86c709f 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -192,7 +192,8 @@ private SnapshotReader readSnapshot( context.log.readSnapshot(snapshotId).get(), context.serde, BufferSupplier.create(), - maxBatchSize + maxBatchSize, + true ); } @@ -246,7 +247,7 @@ record = records.next(); public static void assertSnapshot(List> batches, RawSnapshotReader reader) { assertSnapshot( batches, - RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE) + RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true) ); }