From 0fcbb30b0cddb86ce19bd96bd301e1813636cd86 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 14 Nov 2024 12:44:49 +0000 Subject: [PATCH 1/4] KAFKA-17990: Flaky test improvements --- .../kafka/test/api/ShareConsumerTest.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index b7d127eb429a..33b0f3ae18b5 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -244,6 +244,7 @@ public void testSubscriptionAndPoll(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -259,6 +260,7 @@ public void testSubscriptionAndPollMultiple(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -285,6 +287,7 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws producer.send(record); ProducerRecord record2 = new ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record2).get(); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); @@ -322,6 +325,7 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -351,6 +355,7 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -380,6 +385,7 @@ public void testAcknowledgementCommitCallbackInvalidRecordStateException(String ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -442,6 +448,7 @@ public void testHeaders(String persister) { record.headers().add("headerKey", "headerValue".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -465,6 +472,7 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese KafkaProducer producer = createProducer(new ByteArraySerializer(), serializer); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -567,6 +575,7 @@ public void testExplicitAcknowledgeSuccess(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -586,6 +595,7 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -611,6 +621,7 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte producer.send(record1); producer.send(record2); producer.send(record3); + producer.flush(); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -665,6 +676,7 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) producer.send(record1); producer.send(record2); producer.send(record3); + producer.flush(); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer1.subscribe(Collections.singleton(tp.topic())); @@ -722,6 +734,7 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -743,6 +756,7 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -762,6 +776,7 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -778,6 +793,7 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -798,6 +814,7 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -817,6 +834,7 @@ public void testImplicitAcknowledgeCommitSync(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -842,6 +860,7 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte producer.send(record1); producer.send(record2); producer.send(record3); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -910,6 +929,8 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws producer.send(record); producer.send(record); producer.send(record); + producer.flush(); + // Both the consumers should read all the messages, because they are part of different share groups (both have different group IDs) AtomicInteger shareConsumer1Records = new AtomicInteger(); AtomicInteger shareConsumer2Records = new AtomicInteger(); @@ -958,6 +979,7 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) for (int i = 0; i < totalMessages; i++) { producer.send(record); } + producer.flush(); int consumer1MessageCount = 0; int consumer2MessageCount = 0; @@ -1280,6 +1302,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr alterShareAutoOffsetReset("group1", "earliest"); producer.send(producerRecord1); + producer.flush(); // Poll two times to receive records. The first poll puts the acquisition lock and fetches the record. // Since, we are only sending one record and acquisition lock hasn't timed out, the second poll only acknowledges the @@ -1325,6 +1348,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String per ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); @@ -1333,7 +1357,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String per // The acknowledgment commit callback will try to call a method of KafkaShareConsumer shareConsumer.poll(Duration.ofMillis(5000)); - // The second poll sends the acknowledgments implicitly. + // The second poll sends the acknowledgements implicitly. // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. shareConsumer.poll(Duration.ofMillis(500)); @@ -1367,6 +1391,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); @@ -1407,6 +1432,7 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); @@ -1483,6 +1509,7 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); + producer.flush(); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); @@ -1511,6 +1538,7 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int ProducerRecord record = new ProducerRecord<>(topic, 0, null, "key".getBytes(), "value".getBytes()); producer.send(record); + producer.flush(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer, metadata sync failed"); @@ -1650,6 +1678,7 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); // Producing a record. producer.send(record); + producer.flush(); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); // No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record // was produced before share partition was initialized (which happens after the first share fetch request @@ -1676,6 +1705,7 @@ public void testShareAutoOffsetResetEarliest(String persister) { KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); // Producing a record. producer.send(record); + producer.flush(); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume // all messages present on the partition @@ -1737,6 +1767,7 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); // Producing a record. producer.send(record); + producer.flush(); ConsumerRecords records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume // all messages present on the partition From 82f78d8da15ebccccd66ea31a427bc964adec5f4 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 14 Nov 2024 14:26:21 +0000 Subject: [PATCH 2/4] Make record production more dependable --- .../kafka/test/api/ShareConsumerTest.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 33b0f3ae18b5..c5601abec1f3 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -650,11 +650,12 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte // The 3rd record should be reassigned to 2nd consumer when it polls, kept higher wait time // as time out for locks is 15 secs. TestUtils.waitForCondition(() -> { - ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(200)); + ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(1000)); return records2.count() == 1 && records2.iterator().next().offset() == 2L; }, 30000, 100L, () -> "Didn't receive timed out record"); assertFalse(partitionExceptionMap1.containsKey(tp)); + // The callback will receive the acknowledgement responses asynchronously after the next poll. shareConsumer1.poll(Duration.ofMillis(500)); @@ -1387,7 +1388,7 @@ public void onComplete(Map> offsetsMap, Exception ex */ @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persister) { + public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); @@ -1399,9 +1400,12 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer)); shareConsumer.subscribe(Collections.singleton(tp.topic())); - shareConsumer.poll(Duration.ofMillis(5000)); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); + // The second poll sends the acknowledgments implicitly. - shareConsumer.poll(Duration.ofMillis(1000)); + shareConsumer.poll(Duration.ofMillis(2000)); + // Till now acknowledgement commit callback has not been called, so no exception thrown yet. // On 3rd poll, the acknowledgement commit callback will be called and the exception is thrown. assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); @@ -1428,7 +1432,7 @@ public void onComplete(Map> offsetsMap, Exception ex */ @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgeCommitCallbackThrowsException(String persister) { + public void testAcknowledgeCommitCallbackThrowsException(String persister) throws InterruptedException { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); @@ -1439,10 +1443,11 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) { shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>()); shareConsumer.subscribe(Collections.singleton(tp.topic())); - shareConsumer.poll(Duration.ofMillis(5000)); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); // The second poll sends the acknowledgments implicitly. - shareConsumer.poll(Duration.ofMillis(1000)); + shareConsumer.poll(Duration.ofMillis(2000)); // On the third poll, the acknowledgement commit callback will be called and the exception is thrown. assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); @@ -1574,7 +1579,7 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - producer.send(recordTopic2); + producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); @@ -1583,12 +1588,12 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - producer.send(recordTopic2); + producer.send(recordTopic2).get(); // Poll should give the record from the non-deleted topic baz. TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - producer.send(recordTopic2); + producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); shareConsumer.close(); From bf129d0b7ad6c3f4b55d876499fe8b95a3a840c8 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 14 Nov 2024 18:32:19 +0000 Subject: [PATCH 3/4] Use try-with-resources --- .../kafka/test/api/ShareConsumerTest.java | 1863 ++++++++--------- 1 file changed, 923 insertions(+), 940 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index c5601abec1f3..cd9d2c30f407 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -152,266 +152,266 @@ public void destroyCluster() throws Exception { @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testPollNoSubscribeFails(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); - // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); - shareConsumer.close(); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribeAndPollNoRecords(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - Set subscription = Collections.singleton(tp.topic()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.close(); - assertEquals(0, records.count()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + Set subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribePollUnsubscribe(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - Set subscription = Collections.singleton(tp.topic()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.unsubscribe(); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); - shareConsumer.close(); - assertEquals(0, records.count()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + Set subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribePollSubscribe(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - Set subscription = Collections.singleton(tp.topic()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); - records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.close(); - assertEquals(0, records.count()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + Set subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribeUnsubscribePollFails(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - Set subscription = Collections.singleton(tp.topic()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.unsubscribe(); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); - // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); - shareConsumer.close(); - assertEquals(0, records.count()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + Set subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.unsubscribe(); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribeSubscribeEmptyPollFails(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - Set subscription = Collections.singleton(tp.topic()); - shareConsumer.subscribe(subscription); - assertEquals(subscription, shareConsumer.subscription()); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.subscribe(Collections.emptySet()); - assertEquals(Collections.emptySet(), shareConsumer.subscription()); - // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); - shareConsumer.close(); - assertEquals(0, records.count()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + Set subscription = Collections.singleton(tp.topic()); + shareConsumer.subscribe(subscription); + assertEquals(subscription, shareConsumer.subscription()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + shareConsumer.subscribe(Collections.emptySet()); + assertEquals(Collections.emptySet(), shareConsumer.subscription()); + // "Consumer is not subscribed to any topics." + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscriptionAndPoll(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscriptionAndPollMultiple(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgementSentOnSubscriptionChange(String persister) throws ExecutionException, InterruptedException { - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); + alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - ProducerRecord record2 = new ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), "value".getBytes()); - producer.send(record2).get(); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + ProducerRecord record2 = new ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record2).get(); + producer.flush(); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - alterShareAutoOffsetReset("group1", "earliest"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); - shareConsumer.subscribe(Collections.singletonList(tp2.topic())); + shareConsumer.subscribe(Collections.singletonList(tp2.topic())); - // Waiting for heartbeat to propagate the subscription change. - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + // Waiting for heartbeat to propagate the subscription change. + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records from the updated subscription"); - producer.send(record2).get(); - - //Starting the 3rd poll to invoke the callback - shareConsumer.poll(Duration.ofMillis(500)); + producer.send(record2).get(); - // Verifying if the callback was invoked for the partitions in the old subscription. - assertTrue(partitionExceptionMap.containsKey(tp)); - assertNull(partitionExceptionMap.get(tp)); + //Starting the 3rd poll to invoke the callback + shareConsumer.poll(Duration.ofMillis(500)); - producer.close(); - shareConsumer.close(); + // Verifying if the callback was invoked for the partitions in the old subscription. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) { - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgment error code after the second poll. - // When we start the 3rd poll, the acknowledgment commit callback is invoked. - shareConsumer.poll(Duration.ofMillis(1000)); - shareConsumer.poll(Duration.ofMillis(1000)); + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - // We expect null exception as the acknowledgment error code is null. - assertTrue(partitionExceptionMap.containsKey(tp)); - assertNull(partitionExceptionMap.get(tp)); - shareConsumer.close(); - producer.close(); + producer.send(record); + producer.flush(); + + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + shareConsumer.poll(Duration.ofMillis(1000)); + shareConsumer.poll(Duration.ofMillis(1000)); + + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgementCommitCallbackOnClose(String persister) { - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgement error code asynchronously after the second poll. - // The acknowledgement commit callback is invoked in close. - shareConsumer.poll(Duration.ofMillis(1000)); - shareConsumer.close(); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgement error code asynchronously after the second poll. + // The acknowledgement commit callback is invoked in close. + shareConsumer.poll(Duration.ofMillis(1000)); + shareConsumer.close(); - // We expect null exception as the acknowledgment error code is null. - assertTrue(partitionExceptionMap.containsKey(tp)); - assertNull(partitionExceptionMap.get(tp)); - producer.close(); + // We expect null exception as the acknowledgment error code is null. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgementCommitCallbackInvalidRecordStateException(String persister) throws Exception { - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); - // Waiting until the acquisition lock expires. - Thread.sleep(20000); + // Waiting until the acquisition lock expires. + Thread.sleep(20000); - // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgment error code after the second poll. - // When we start the 3rd poll, the acknowledgment commit callback is invoked. - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + // Now in the second poll, we implicitly acknowledge the record received in the first poll. + // We get back the acknowledgment error code after the second poll. + // When we start the 3rd poll, the acknowledgment commit callback is invoked. + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); - records = shareConsumer.poll(Duration.ofMillis(200)); - assertEquals(0, records.count()); + records = shareConsumer.poll(Duration.ofMillis(200)); + assertEquals(0, records.count()); - // As we tried to acknowledge a record after the acquisition lock expired, - // we wil get an InvalidRecordStateException. - assertInstanceOf(InvalidRecordStateException.class, partitionExceptionMap.get(tp)); - shareConsumer.close(); - producer.close(); + // As we tried to acknowledge a record after the acquisition lock expired, + // we wil get an InvalidRecordStateException. + assertTrue(partitionExceptionMap.containsKey(tp)); + assertInstanceOf(InvalidRecordStateException.class, partitionExceptionMap.get(tp)); + } } private static class TestableAcknowledgeCommitCallback implements AcknowledgementCommitCallback { @@ -443,45 +443,44 @@ public void onComplete(Map> offsetsMap, Exception ex @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testHeaders(String persister) { - int numRecords = 1; - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - record.headers().add("headerKey", "headerValue".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + int numRecords = 1; + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); - List> records = consumeRecords(shareConsumer, numRecords); - assertEquals(numRecords, records.size()); + List> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); - for (ConsumerRecord consumerRecord : records) { - Header header = consumerRecord.headers().lastHeader("headerKey"); - if (header != null) - assertEquals("headerValue", new String(header.value())); + for (ConsumerRecord consumerRecord : records) { + Header header = consumerRecord.headers().lastHeader("headerKey"); + if (header != null) + assertEquals("headerValue", new String(header.value())); + } } - shareConsumer.close(); - producer.close(); } private void testHeadersSerializeDeserialize(Serializer serializer, Deserializer deserializer) { - int numRecords = 1; - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), serializer); + KafkaShareConsumer shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1")) { - KafkaProducer producer = createProducer(new ByteArraySerializer(), serializer); - producer.send(record); - producer.flush(); + int numRecords = 1; + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - alterShareAutoOffsetReset("group1", "earliest"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - List> records = consumeRecords(shareConsumer, numRecords); - assertEquals(numRecords, records.size()); - shareConsumer.close(); - producer.close(); + List> records = consumeRecords(shareConsumer, numRecords); + assertEquals(numRecords, records.size()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -493,514 +492,517 @@ public void testHeadersSerializerDeserializer(String persister) { @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testMaxPollRecords(String persister) { - int maxPollRecords = 2; int numRecords = 10000; + int maxPollRecords = 2; - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - long startingTimestamp = System.currentTimeMillis(); - produceMessagesWithTimestamp(numRecords, startingTimestamp); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), - "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords))); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - - List> records = consumeRecords(shareConsumer, numRecords); - long i = 0L; - for (ConsumerRecord record : records) { - assertEquals(tp.topic(), record.topic()); - assertEquals(tp.partition(), record.partition()); - assertEquals(TimestampType.CREATE_TIME, record.timestampType()); - assertEquals(startingTimestamp + i, record.timestamp()); - assertEquals("key " + i, new String(record.key())); - assertEquals("value " + i, new String(record.value())); - // this is true only because K and V are byte arrays - assertEquals(("key " + i).length(), record.serializedKeySize()); - assertEquals(("value " + i).length(), record.serializedValueSize()); - - i++; + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)))) { + + long startingTimestamp = System.currentTimeMillis(); + produceMessagesWithTimestamp(numRecords, startingTimestamp); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + List> records = consumeRecords(shareConsumer, numRecords); + long i = 0L; + for (ConsumerRecord record : records) { + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(startingTimestamp + i, record.timestamp()); + assertEquals("key " + i, new String(record.key())); + assertEquals("value " + i, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals(("key " + i).length(), record.serializedKeySize()); + assertEquals(("value " + i).length(), record.serializedValueSize()); + + i++; + } } - shareConsumer.close(); - producer.close(); } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testControlRecordsSkipped(String persister) throws Exception { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); + KafkaProducer nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - KafkaProducer transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); - transactionalProducer.initTransactions(); - transactionalProducer.beginTransaction(); - RecordMetadata transactional1 = transactionalProducer.send(record).get(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - RecordMetadata nonTransactional1 = nonTransactionalProducer.send(record).get(); + transactionalProducer.initTransactions(); + transactionalProducer.beginTransaction(); + RecordMetadata transactional1 = transactionalProducer.send(record).get(); - transactionalProducer.commitTransaction(); + RecordMetadata nonTransactional1 = nonTransactionalProducer.send(record).get(); - transactionalProducer.beginTransaction(); - RecordMetadata transactional2 = transactionalProducer.send(record).get(); - transactionalProducer.abortTransaction(); + transactionalProducer.commitTransaction(); - RecordMetadata nonTransactional2 = nonTransactionalProducer.send(record).get(); + transactionalProducer.beginTransaction(); + RecordMetadata transactional2 = transactionalProducer.send(record).get(); + transactionalProducer.abortTransaction(); - transactionalProducer.close(); - nonTransactionalProducer.close(); + RecordMetadata nonTransactional2 = nonTransactionalProducer.send(record).get(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - alterShareAutoOffsetReset("group1", "earliest"); + transactionalProducer.close(); + nonTransactionalProducer.close(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(4, records.count()); - assertEquals(transactional1.offset(), records.records(tp).get(0).offset()); - assertEquals(nonTransactional1.offset(), records.records(tp).get(1).offset()); - assertEquals(transactional2.offset(), records.records(tp).get(2).offset()); - assertEquals(nonTransactional2.offset(), records.records(tp).get(3).offset()); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(4, records.count()); + assertEquals(transactional1.offset(), records.records(tp).get(0).offset()); + assertEquals(nonTransactional1.offset(), records.records(tp).get(1).offset()); + assertEquals(transactional2.offset(), records.records(tp).get(2).offset()); + assertEquals(nonTransactional2.offset(), records.records(tp).get(3).offset()); - // There will be control records on the topic-partition, so the offsets of the non-control records - // are not 0, 1, 2, 3. Just assert that the offset of the final one is not 3. - assertNotEquals(3, nonTransactional2.offset()); + // There will be control records on the topic-partition, so the offsets of the non-control records + // are not 0, 1, 2, 3. Just assert that the offset of the final one is not 3. + assertNotEquals(3, nonTransactional2.offset()); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - shareConsumer.close(); - transactionalProducer.close(); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeSuccess(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(shareConsumer::acknowledge); - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeCommitSuccess(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(shareConsumer::acknowledge); - producer.send(record); - Map> result = shareConsumer.commitSync(); - assertEquals(1, result.size()); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(shareConsumer::acknowledge); + producer.send(record); + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { - ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record1); - producer.send(record2); - producer.send(record3); - producer.flush(); - - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); - Map> partitionOffsetsMap1 = new HashMap<>(); - Map partitionExceptionMap1 = new HashMap<>(); - shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer2.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(3, records.count()); - Iterator> iterator = records.iterator(); + Map> partitionOffsetsMap1 = new HashMap<>(); + Map partitionExceptionMap1 = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); - // Acknowledging 2 out of the 3 records received via commitAsync. - ConsumerRecord firstRecord = iterator.next(); - ConsumerRecord secondRecord = iterator.next(); - assertEquals(0L, firstRecord.offset()); - assertEquals(1L, secondRecord.offset()); + ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator> iterator = records.iterator(); - shareConsumer1.acknowledge(firstRecord); - shareConsumer1.acknowledge(secondRecord); - shareConsumer1.commitAsync(); + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord firstRecord = iterator.next(); + ConsumerRecord secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); - // The 3rd record should be reassigned to 2nd consumer when it polls, kept higher wait time - // as time out for locks is 15 secs. - TestUtils.waitForCondition(() -> { - ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(1000)); - return records2.count() == 1 && records2.iterator().next().offset() == 2L; - }, 30000, 100L, () -> "Didn't receive timed out record"); + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); - assertFalse(partitionExceptionMap1.containsKey(tp)); + // The 3rd record should be reassigned to 2nd consumer when it polls, kept higher wait time + // as time out for locks is 15 secs. + TestUtils.waitForCondition(() -> { + ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(1000)); + return records2.count() == 1 && records2.iterator().next().offset() == 2L; + }, 30000, 100L, () -> "Didn't receive timed out record"); - // The callback will receive the acknowledgement responses asynchronously after the next poll. - shareConsumer1.poll(Duration.ofMillis(500)); + assertFalse(partitionExceptionMap1.containsKey(tp)); - shareConsumer1.close(); - shareConsumer2.close(); - producer.close(); + // The callback will receive the acknowledgement responses asynchronously after the next poll. + shareConsumer1.poll(Duration.ofMillis(500)); - assertTrue(partitionExceptionMap1.containsKey(tp)); - assertNull(partitionExceptionMap1.get(tp)); + assertTrue(partitionExceptionMap1.containsKey(tp)); + assertNull(partitionExceptionMap1.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) { - ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record1); - producer.send(record2); - producer.send(record3); - producer.flush(); - - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); - shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(3, records.count()); - Iterator> iterator = records.iterator(); + Map> partitionOffsetsMap = new HashMap<>(); + Map partitionExceptionMap = new HashMap<>(); + shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - // Acknowledging 2 out of the 3 records received via commitAsync. - ConsumerRecord firstRecord = iterator.next(); - ConsumerRecord secondRecord = iterator.next(); - assertEquals(0L, firstRecord.offset()); - assertEquals(1L, secondRecord.offset()); + ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); + Iterator> iterator = records.iterator(); - shareConsumer1.acknowledge(firstRecord); - shareConsumer1.acknowledge(secondRecord); - shareConsumer1.commitAsync(); + // Acknowledging 2 out of the 3 records received via commitAsync. + ConsumerRecord firstRecord = iterator.next(); + ConsumerRecord secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); - // The 3rd record should be re-presented to the consumer when it polls again. - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + shareConsumer1.acknowledge(firstRecord); + shareConsumer1.acknowledge(secondRecord); + shareConsumer1.commitAsync(); - // And poll again without acknowledging - the callback will receive the acknowledgement responses too - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + // The 3rd record should be re-presented to the consumer when it polls again. + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); - shareConsumer1.acknowledge(firstRecord); + // And poll again without acknowledging - the callback will receive the acknowledgement responses too + records = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + iterator = records.iterator(); + firstRecord = iterator.next(); + assertEquals(2L, firstRecord.offset()); - // The callback will receive the acknowledgement responses after polling. The callback is - // called on entry to the poll method or during close. The commit is being performed asynchronously, so - // we can only rely on the completion once the consumer has closed because that waits for the response. - shareConsumer1.poll(Duration.ofMillis(500)); + shareConsumer1.acknowledge(firstRecord); - shareConsumer1.close(); - producer.close(); + // The callback will receive the acknowledgement responses after polling. The callback is + // called on entry to the poll method or during close. The commit is being performed asynchronously, so + // we can only rely on the completion once the consumer has closed because that waits for the response. + shareConsumer1.poll(Duration.ofMillis(500)); - assertTrue(partitionExceptionMap.containsKey(tp)); - assertNull(partitionExceptionMap.get(tp)); + shareConsumer1.close(); + + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeReleasePollAccept(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeReleaseAccept(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); - records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeReleaseClose(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - ConsumerRecord consumedRecord = records.records(tp).get(0); - shareConsumer.acknowledge(consumedRecord); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + ConsumerRecord consumedRecord = records.records(tp).get(0); + shareConsumer.acknowledge(consumedRecord); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testImplicitAcknowledgeFailsExplicit(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - ConsumerRecord consumedRecord = records.records(tp).get(0); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + ConsumerRecord consumedRecord = records.records(tp).get(0); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testImplicitAcknowledgeCommitSync(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - Map> result = shareConsumer.commitSync(); - assertEquals(1, result.size()); - result = shareConsumer.commitSync(); - assertEquals(0, result.size()); - records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + result = shareConsumer.commitSync(); + assertEquals(0, result.size()); + records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testImplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { - ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record1); - producer.send(record2); - producer.send(record3); - producer.flush(); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); - Map> partitionOffsetsMap1 = new HashMap<>(); - Map partitionExceptionMap1 = new HashMap<>(); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); + Map> partitionOffsetsMap1 = new HashMap<>(); + Map partitionExceptionMap1 = new HashMap<>(); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(3, records.count()); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); - // Implicitly acknowledging all the records received. - shareConsumer.commitAsync(); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(3, records.count()); - assertFalse(partitionExceptionMap1.containsKey(tp)); - // The callback will receive the acknowledgement responses after the next poll. - TestUtils.waitForCondition(() -> { - shareConsumer.poll(Duration.ofMillis(1000)); - return partitionExceptionMap1.containsKey(tp); - }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); + // Implicitly acknowledging all the records received. + shareConsumer.commitAsync(); - assertNull(partitionExceptionMap1.get(tp)); + assertFalse(partitionExceptionMap1.containsKey(tp)); + // The callback will receive the acknowledgement responses after the next poll. + TestUtils.waitForCondition(() -> { + shareConsumer.poll(Duration.ofMillis(1000)); + return partitionExceptionMap1.containsKey(tp); + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); - shareConsumer.close(); - producer.close(); + assertNull(partitionExceptionMap1.get(tp)); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) throws Exception { int maxPartitionFetchBytes = 10000; - ProducerRecord smallRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - ProducerRecord bigRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new byte[maxPartitionFetchBytes]); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(smallRecord).get(); - producer.send(bigRecord).get(); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), - "group1", Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes))); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + "group1", Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes)))) { + + ProducerRecord smallRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord bigRecord = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new byte[maxPartitionFetchBytes]); + producer.send(smallRecord).get(); + producer.send(bigRecord).get(); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testMultipleConsumersWithDifferentGroupIds(String persister) throws InterruptedException { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); - - KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group2", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + + shareConsumer2.subscribe(Collections.singleton(tp.topic())); + + // producing 3 records to the topic + producer.send(record); + producer.send(record); + producer.send(record); + producer.flush(); + + // Both the consumers should read all the messages, because they are part of different share groups (both have different group IDs) + AtomicInteger shareConsumer1Records = new AtomicInteger(); + AtomicInteger shareConsumer2Records = new AtomicInteger(); + TestUtils.waitForCondition(() -> { + int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); + int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); + return records1 == 3 && records2 == 3; + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers"); + + producer.send(record); + producer.send(record); - // producing 3 records to the topic - producer.send(record); - producer.send(record); - producer.send(record); - producer.flush(); - - // Both the consumers should read all the messages, because they are part of different share groups (both have different group IDs) - AtomicInteger shareConsumer1Records = new AtomicInteger(); - AtomicInteger shareConsumer2Records = new AtomicInteger(); - TestUtils.waitForCondition(() -> { - int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); - int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); - return records1 == 3 && records2 == 3; - }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers"); - - producer.send(record); - producer.send(record); - - shareConsumer1Records.set(0); - TestUtils.waitForCondition(() -> shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()) == 2, - DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer 1"); - - producer.send(record); - producer.send(record); - producer.send(record); - - shareConsumer1Records.set(0); - shareConsumer2Records.set(0); - TestUtils.waitForCondition(() -> { - int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); - int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); - return records1 == 3 && records2 == 5; - }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); - - shareConsumer1.close(); - shareConsumer2.close(); - producer.close(); + shareConsumer1Records.set(0); + TestUtils.waitForCondition(() -> shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()) == 2, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer 1"); + + producer.send(record); + producer.send(record); + producer.send(record); + + shareConsumer1Records.set(0); + shareConsumer2Records.set(0); + TestUtils.waitForCondition(() -> { + int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); + int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); + return records1 == 3 && records2 == 5; + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testMultipleConsumersInGroupSequentialConsumption(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - int totalMessages = 2000; - for (int i = 0; i < totalMessages; i++) { - producer.send(record); - } - producer.flush(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer2.subscribe(Collections.singleton(tp.topic())); - int consumer1MessageCount = 0; - int consumer2MessageCount = 0; + int totalMessages = 2000; + for (int i = 0; i < totalMessages; i++) { + producer.send(record); + } + producer.flush(); - int maxRetries = 10; - int retries = 0; - while (retries < maxRetries) { - ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(2000)); - consumer1MessageCount += records1.count(); - ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(2000)); - consumer2MessageCount += records2.count(); - if (records1.count() + records2.count() == 0) - break; - retries++; - } + int consumer1MessageCount = 0; + int consumer2MessageCount = 0; + + int maxRetries = 10; + int retries = 0; + while (retries < maxRetries) { + ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(2000)); + consumer1MessageCount += records1.count(); + ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(2000)); + consumer2MessageCount += records2.count(); + if (records1.count() + records2.count() == 0) + break; + retries++; + } - assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); - shareConsumer1.close(); - shareConsumer2.close(); - producer.close(); + assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1017,9 +1019,6 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount); - // This consumer is created to register the share group id with the groupCoordinator - // so that the config share.auto.offset.reset can be altered for this group - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); alterShareAutoOffsetReset(groupId, "earliest"); for (int i = 0; i < producerCount; i++) { @@ -1056,6 +1055,7 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + @SuppressWarnings("NPathComplexity") public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) { AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0); AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0); @@ -1070,13 +1070,8 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe String groupId2 = "group2"; String groupId3 = "group3"; - // These consumers are created to register the share group ids with the groupCoordinator - // so that the config share.auto.offset.reset can be altered for these groups - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId1); alterShareAutoOffsetReset(groupId1, "earliest"); - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId2); alterShareAutoOffsetReset(groupId2, "earliest"); - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId3); alterShareAutoOffsetReset(groupId3, "earliest"); ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); @@ -1178,46 +1173,48 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testConsumerCloseInGroupSequential(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); - KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer2.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - int totalMessages = 1500; - for (int i = 0; i < totalMessages; i++) { - producer.send(record); - } - producer.close(); - - int consumer1MessageCount = 0; - int consumer2MessageCount = 0; - - // Poll three times to receive records. The second poll acknowledges the records - // from the first poll, and so on. The third poll's records are not acknowledged - // because the consumer is closed, which makes the broker release the records fetched. - ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(5000)); - consumer1MessageCount += records1.count(); - int consumer1MessageCountA = records1.count(); - records1 = shareConsumer1.poll(Duration.ofMillis(5000)); - consumer1MessageCount += records1.count(); - int consumer1MessageCountB = records1.count(); - records1 = shareConsumer1.poll(Duration.ofMillis(5000)); - int consumer1MessageCountC = records1.count(); - assertEquals(totalMessages, consumer1MessageCountA + consumer1MessageCountB + consumer1MessageCountC); - shareConsumer1.close(); - - int maxRetries = 10; - int retries = 0; - while (consumer1MessageCount + consumer2MessageCount < totalMessages && retries < maxRetries) { - ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(5000)); - consumer2MessageCount += records2.count(); - retries++; + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); + shareConsumer2.subscribe(Collections.singleton(tp.topic())); + + int totalMessages = 1500; + for (int i = 0; i < totalMessages; i++) { + producer.send(record); + } + producer.close(); + + int consumer1MessageCount = 0; + int consumer2MessageCount = 0; + + // Poll three times to receive records. The second poll acknowledges the records + // from the first poll, and so on. The third poll's records are not acknowledged + // because the consumer is closed, which makes the broker release the records fetched. + ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(5000)); + consumer1MessageCount += records1.count(); + int consumer1MessageCountA = records1.count(); + records1 = shareConsumer1.poll(Duration.ofMillis(5000)); + consumer1MessageCount += records1.count(); + int consumer1MessageCountB = records1.count(); + records1 = shareConsumer1.poll(Duration.ofMillis(5000)); + int consumer1MessageCountC = records1.count(); + assertEquals(totalMessages, consumer1MessageCountA + consumer1MessageCountB + consumer1MessageCountC); + shareConsumer1.close(); + + int maxRetries = 10; + int retries = 0; + while (consumer1MessageCount + consumer2MessageCount < totalMessages && retries < maxRetries) { + ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(5000)); + consumer2MessageCount += records2.count(); + retries++; + } + shareConsumer2.close(); + assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); } - shareConsumer2.close(); - assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1231,9 +1228,6 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers String groupId = "group1"; - // This consumer is created to register the share group id with the groupCoordinator - // so that the config share.auto.offset.reset can be altered for this group - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); alterShareAutoOffsetReset(groupId, "earliest"); ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount); @@ -1293,50 +1287,49 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcquisitionLockTimeoutOnConsumer(String persister) throws InterruptedException { - ProducerRecord producerRecord1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, - "key_1".getBytes(), "value_1".getBytes()); - ProducerRecord producerRecord2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, - "key_2".getBytes(), "value_2".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer1.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - producer.send(producerRecord1); - producer.flush(); - - // Poll two times to receive records. The first poll puts the acquisition lock and fetches the record. - // Since, we are only sending one record and acquisition lock hasn't timed out, the second poll only acknowledges the - // record from the first poll and no more fetch. - ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records1.count()); - assertEquals("key_1", new String(records1.iterator().next().key())); - assertEquals("value_1", new String(records1.iterator().next().value())); - ConsumerRecords records2 = shareConsumer1.poll(Duration.ofMillis(500)); - assertEquals(0, records2.count()); - - producer.send(producerRecord2); - - // Poll three times. The first poll puts the acquisition lock and fetches the record. Before the second poll, - // acquisition lock times out and hence the consumer needs to fetch the record again. Since, the acquisition lock - // hasn't timed out before the third poll, the third poll only acknowledges the record from the second poll and no more fetch. - records1 = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records1.count()); - assertEquals("key_2", new String(records1.iterator().next().key())); - assertEquals("value_2", new String(records1.iterator().next().value())); + ProducerRecord producerRecord1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, + "key_1".getBytes(), "value_1".getBytes()); + ProducerRecord producerRecord2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, + "key_2".getBytes(), "value_2".getBytes()); + shareConsumer1.subscribe(Collections.singleton(tp.topic())); - // Allowing acquisition lock to expire. - Thread.sleep(20000); - - records2 = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records2.count()); - assertEquals("key_2", new String(records2.iterator().next().key())); - assertEquals("value_2", new String(records2.iterator().next().value())); - ConsumerRecords records3 = shareConsumer1.poll(Duration.ofMillis(500)); - assertEquals(0, records3.count()); + producer.send(producerRecord1); + producer.flush(); - producer.close(); - shareConsumer1.close(); + // Poll two times to receive records. The first poll puts the acquisition lock and fetches the record. + // Since, we are only sending one record and acquisition lock hasn't timed out, the second poll only acknowledges the + // record from the first poll and no more fetch. + ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records1.count()); + assertEquals("key_1", new String(records1.iterator().next().key())); + assertEquals("value_1", new String(records1.iterator().next().value())); + ConsumerRecords records2 = shareConsumer1.poll(Duration.ofMillis(500)); + assertEquals(0, records2.count()); + + producer.send(producerRecord2); + + // Poll three times. The first poll puts the acquisition lock and fetches the record. Before the second poll, + // acquisition lock times out and hence the consumer needs to fetch the record again. Since, the acquisition lock + // hasn't timed out before the third poll, the third poll only acknowledges the record from the second poll and no more fetch. + records1 = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records1.count()); + assertEquals("key_2", new String(records1.iterator().next().key())); + assertEquals("value_2", new String(records1.iterator().next().value())); + + // Allowing acquisition lock to expire. + Thread.sleep(20000); + + records2 = shareConsumer1.poll(Duration.ofMillis(5000)); + assertEquals(1, records2.count()); + assertEquals("key_2", new String(records2.iterator().next().key())); + assertEquals("value_2", new String(records2.iterator().next().value())); + ConsumerRecords records3 = shareConsumer1.poll(Duration.ofMillis(500)); + assertEquals(0, records3.count()); + } } /** @@ -1346,24 +1339,24 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - // The acknowledgment commit callback will try to call a method of KafkaShareConsumer - shareConsumer.poll(Duration.ofMillis(5000)); - // The second poll sends the acknowledgements implicitly. - // The acknowledgement commit callback will be called and the exception is thrown. - // This is verified inside the onComplete() method implementation. - shareConsumer.poll(Duration.ofMillis(500)); - shareConsumer.close(); - producer.close(); + // The acknowledgment commit callback will try to call a method of KafkaShareConsumer + shareConsumer.poll(Duration.ofMillis(5000)); + // The second poll sends the acknowledgements implicitly. + // The acknowledgement commit callback will be called and the exception is thrown. + // This is verified inside the onComplete() method implementation. + shareConsumer.poll(Duration.ofMillis(500)); + } } private class TestableAcknowledgeCommitCallbackWithShareConsumer implements AcknowledgementCommitCallback { @@ -1389,28 +1382,28 @@ public void onComplete(Map> offsetsMap, Exception ex @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - // The acknowledgment commit callback will try to call a method of KafkaShareConsumer - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + // The acknowledgment commit callback will try to call a method of KafkaShareConsumer + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - // The second poll sends the acknowledgments implicitly. - shareConsumer.poll(Duration.ofMillis(2000)); + // The second poll sends the acknowledgments implicitly. + shareConsumer.poll(Duration.ofMillis(2000)); - // Till now acknowledgement commit callback has not been called, so no exception thrown yet. - // On 3rd poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); - shareConsumer.close(); - producer.close(); + // Till now acknowledgement commit callback has not been called, so no exception thrown yet. + // On 3rd poll, the acknowledgement commit callback will be called and the exception is thrown. + assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + } } private static class TestableAcknowledgeCommitCallbackWakeup implements AcknowledgementCommitCallback { @@ -1433,27 +1426,26 @@ public void onComplete(Map> offsetsMap, Exception ex @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testAcknowledgeCommitCallbackThrowsException(String persister) throws InterruptedException { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>()); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>()); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - // The second poll sends the acknowledgments implicitly. - shareConsumer.poll(Duration.ofMillis(2000)); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - // On the third poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + // The second poll sends the acknowledgments implicitly. + shareConsumer.poll(Duration.ofMillis(2000)); - shareConsumer.close(); - producer.close(); + // On the third poll, the acknowledgement commit callback will be called and the exception is thrown. + assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + } } private static class TestableAcknowledgeCommitCallbackThrows implements AcknowledgementCommitCallback { @@ -1470,21 +1462,22 @@ public void onComplete(Map> offsetsMap, Exception ex @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - // interrupt the thread and call poll - try { - Thread.currentThread().interrupt(); - assertThrows(InterruptException.class, () -> shareConsumer.poll(Duration.ZERO)); - } finally { - // clear interrupted state again since this thread may be reused by JUnit - Thread.interrupted(); - } + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + // interrupt the thread and call poll + try { + Thread.currentThread().interrupt(); + assertThrows(InterruptException.class, () -> shareConsumer.poll(Duration.ZERO)); + } finally { + // clear interrupted state again since this thread may be reused by JUnit + Thread.interrupted(); + } - assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO)); - shareConsumer.close(); + assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO)); + } } /** @@ -1494,14 +1487,15 @@ public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton("topic abc")); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - // The exception depends upon a metadata response which arrives asynchronously. If the delay is - // too short, the poll might return before the error is known. - assertThrows(InvalidTopicException.class, () -> shareConsumer.poll(Duration.ofMillis(10000))); - shareConsumer.close(); + shareConsumer.subscribe(Collections.singleton("topic abc")); + + // The exception depends upon a metadata response which arrives asynchronously. If the delay is + // too short, the poll might return before the error is known. + assertThrows(InvalidTopicException.class, () -> shareConsumer.poll(Duration.ofMillis(10000))); + } } /** @@ -1511,51 +1505,51 @@ public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persis @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testWakeupWithFetchedRecordsAvailable(String persister) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - producer.send(record); - producer.flush(); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); - shareConsumer.wakeup(); - assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO)); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + shareConsumer.wakeup(); + assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO)); - shareConsumer.close(); - producer.close(); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testSubscriptionFollowedByTopicCreation(String persister) throws InterruptedException { - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - String topic = "foo"; - shareConsumer.subscribe(Collections.singleton(topic)); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - // Topic is created post creation of share consumer and subscription - createTopic(topic); + String topic = "foo"; + shareConsumer.subscribe(Collections.singleton(topic)); - ProducerRecord record = new ProducerRecord<>(topic, 0, null, "key".getBytes(), "value".getBytes()); - producer.send(record); - producer.flush(); + // Topic is created post creation of share consumer and subscription + createTopic(topic); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer, metadata sync failed"); + ProducerRecord record = new ProducerRecord<>(topic, 0, null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); - producer.send(record); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer, metadata sync failed"); + + producer.send(record); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1566,237 +1560,229 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr createTopic(topic1); createTopic(topic2); - ProducerRecord recordTopic1 = new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()); - ProducerRecord recordTopic2 = new ProducerRecord<>(topic2, 0, null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - // Consumer subscribes to the topics -> bar and baz. - shareConsumer.subscribe(Arrays.asList(topic1, topic2)); alterShareAutoOffsetReset("group1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { - producer.send(recordTopic1).get(); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + ProducerRecord recordTopic1 = new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()); + ProducerRecord recordTopic2 = new ProducerRecord<>(topic2, 0, null, "key".getBytes(), "value".getBytes()); - producer.send(recordTopic2).get(); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + // Consumer subscribes to the topics -> bar and baz. + shareConsumer.subscribe(Arrays.asList(topic1, topic2)); - // Topic bar is deleted, hence poll should not give any results. - deleteTopic(topic1); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); - assertEquals(0, records.count()); + producer.send(recordTopic1).get(); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - producer.send(recordTopic2).get(); - // Poll should give the record from the non-deleted topic baz. - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + producer.send(recordTopic2).get(); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - producer.send(recordTopic2).get(); - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - shareConsumer.close(); - producer.close(); + // Topic bar is deleted, hence poll should not give any results. + deleteTopic(topic1); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); + + producer.send(recordTopic2).get(); + // Poll should give the record from the non-deleted topic baz. + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + + producer.send(recordTopic2).get(); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testLsoMovementByRecordsDeletion(String persister) { - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - ProducerRecord record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes()); - String groupId = "group1"; - // This consumer is created to register the share group id with the groupCoordinator - // so that the config share.auto.offset.reset can be altered for this group - createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); alterShareAutoOffsetReset(groupId, "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { - // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. - try { - for (int i = 0; i < 10; i++) { - producer.send(record).get(); + ProducerRecord record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes()); + + // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. + try { + for (int i = 0; i < 10; i++) { + producer.send(record).get(); + } + } catch (Exception e) { + fail("Failed to send records: " + e); } - } catch (Exception e) { - fail("Failed to send records: " + e); - } - // We delete records before offset 5, so the LSO should move to 5. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); + // We delete records before offset 5, so the LSO should move to 5. + adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); - AtomicInteger totalMessagesConsumed = new AtomicInteger(0); - CompletableFuture future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, future); - // The records returned belong to offsets 5-9. - assertEquals(5, totalMessagesConsumed.get()); - try { - assertEquals(5, future.get()); - } catch (Exception e) { - fail("Exception occurred : " + e.getMessage()); - } + AtomicInteger totalMessagesConsumed = new AtomicInteger(0); + CompletableFuture future = new CompletableFuture<>(); + consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, future); + // The records returned belong to offsets 5-9. + assertEquals(5, totalMessagesConsumed.get()); + try { + assertEquals(5, future.get()); + } catch (Exception e) { + fail("Exception occurred : " + e.getMessage()); + } - // We write 5 records to the topic, so they would be written from offsets 10-14 on the topic. - try { - for (int i = 0; i < 5; i++) { - producer.send(record).get(); + // We write 5 records to the topic, so they would be written from offsets 10-14 on the topic. + try { + for (int i = 0; i < 5; i++) { + producer.send(record).get(); + } + } catch (Exception e) { + fail("Failed to send records: " + e); } - } catch (Exception e) { - fail("Failed to send records: " + e); - } - // We delete records before offset 14, so the LSO should move to 14. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(14L))); + // We delete records before offset 14, so the LSO should move to 14. + adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(14L))); - totalMessagesConsumed = new AtomicInteger(0); - future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, future); - // The record returned belong to offset 14. - assertEquals(1, totalMessagesConsumed.get()); - try { - assertEquals(1, future.get()); - } catch (Exception e) { - fail("Exception occurred : " + e.getMessage()); - } + totalMessagesConsumed = new AtomicInteger(0); + future = new CompletableFuture<>(); + consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, future); + // The record returned belong to offset 14. + assertEquals(1, totalMessagesConsumed.get()); + try { + assertEquals(1, future.get()); + } catch (Exception e) { + fail("Exception occurred : " + e.getMessage()); + } - // We delete records before offset 15, so the LSO should move to 15 and now no records should be returned. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(15L))); + // We delete records before offset 15, so the LSO should move to 15 and now no records should be returned. + adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(15L))); - totalMessagesConsumed = new AtomicInteger(0); - future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future); - assertEquals(0, totalMessagesConsumed.get()); - try { - assertEquals(0, future.get()); - } catch (Exception e) { - fail("Exception occurred : " + e.getMessage()); + totalMessagesConsumed = new AtomicInteger(0); + future = new CompletableFuture<>(); + consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future); + assertEquals(0, totalMessagesConsumed.get()); + try { + assertEquals(0, future.get()); + } catch (Exception e) { + fail("Exception occurred : " + e.getMessage()); + } } - producer.close(); } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testShareAutoOffsetResetDefaultValue(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - // Producing a record. - producer.send(record); - producer.flush(); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - // No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record - // was produced before share partition was initialized (which happens after the first share fetch request - // in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already - // present 0th record) - assertEquals(0, records.count()); - // Producing another record. - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - // Now the next record should be consumed successfully - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + // Producing a record. + producer.send(record); + producer.flush(); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + // No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record + // was produced before share partition was initialized (which happens after the first share fetch request + // in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already + // present 0th record) + assertEquals(0, records.count()); + // Producing another record. + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + // Now the next record should be consumed successfully + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testShareAutoOffsetResetEarliest(String persister) { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - // Changing the value of share.auto.offset.reset value to "earliest" alterShareAutoOffsetReset("group1", "earliest"); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - // Producing a record. - producer.send(record); - producer.flush(); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume - // all messages present on the partition - assertEquals(1, records.count()); - // Producing another record. - producer.send(record); - records = shareConsumer.poll(Duration.ofMillis(5000)); - // The next records should also be consumed successfully - assertEquals(1, records.count()); - shareConsumer.close(); - producer.close(); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + // Producing a record. + producer.send(record); + producer.flush(); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume + // all messages present on the partition + assertEquals(1, records.count()); + // Producing another record. + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + // The next records should also be consumed successfully + assertEquals(1, records.count()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) throws Exception { - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - // Changing the value of share.auto.offset.reset value to "earliest" alterShareAutoOffsetReset("group1", "earliest"); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. - try { - for (int i = 0; i < 10; i++) { - producer.send(record).get(); - } - } catch (Exception e) { - fail("Failed to send records: " + e); - } + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { - // We delete records before offset 5, so the LSO should move to 5. - adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); + shareConsumer.subscribe(Collections.singleton(tp.topic())); - AtomicInteger totalMessagesConsumed = new AtomicInteger(0); - CompletableFuture future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future); - // The records returned belong to offsets 5-9. - assertEquals(5, totalMessagesConsumed.get()); - assertEquals(5, future.get()); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. + try { + for (int i = 0; i < 10; i++) { + producer.send(record).get(); + } + } catch (Exception e) { + fail("Failed to send records: " + e); + } + + // We delete records before offset 5, so the LSO should move to 5. + adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); - shareConsumer.close(); - producer.close(); + AtomicInteger totalMessagesConsumed = new AtomicInteger(0); + CompletableFuture future = new CompletableFuture<>(); + consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future); + // The records returned belong to offsets 5-9. + assertEquals(5, totalMessagesConsumed.get()); + assertEquals(5, future.get()); + } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) { - KafkaShareConsumer shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumerEarliest.subscribe(Collections.singleton(tp.topic())); - // Changing the value of share.auto.offset.reset value to "earliest" for group1 alterShareAutoOffsetReset("group1", "earliest"); - - KafkaShareConsumer shareConsumerLatest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); - shareConsumerLatest.subscribe(Collections.singleton(tp.topic())); - // Changing the value of share.auto.offset.reset value to "latest" for group2 alterShareAutoOffsetReset("group2", "latest"); - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - // Producing a record. - producer.send(record); - producer.flush(); - ConsumerRecords records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); - // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume - // all messages present on the partition - assertEquals(1, records1.count()); + try (KafkaShareConsumer shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaShareConsumer shareConsumerLatest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { + + shareConsumerEarliest.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); - // Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume - // any message - assertEquals(0, records2.count()); + shareConsumerLatest.subscribe(Collections.singleton(tp.topic())); - // Producing another record. - producer.send(record); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + // Producing a record. + producer.send(record); + producer.flush(); + ConsumerRecords records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume + // all messages present on the partition + assertEquals(1, records1.count()); - records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); - // The next record should also be consumed successfully by group1 - assertEquals(1, records1.count()); + ConsumerRecords records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume + // any message + assertEquals(0, records2.count()); - records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); - // The next record should also be consumed successfully by group2 - assertEquals(1, records2.count()); + // Producing another record. + producer.send(record); + + records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); + // The next record should also be consumed successfully by group1 + assertEquals(1, records1.count()); - shareConsumerEarliest.close(); - shareConsumerLatest.close(); - producer.close(); + records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); + // The next record should also be consumed successfully by group2 + assertEquals(1, records2.count()); + } } private CompletableFuture produceMessages(int messageCount) { @@ -1969,18 +1955,15 @@ private void warmup() throws InterruptedException, ExecutionException, TimeoutEx !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); ProducerRecord record = new ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, "key".getBytes(), "value".getBytes()); - KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "warmupgroup1"); Set subscription = Collections.singleton(warmupTp.topic()); - try { + alterShareAutoOffsetReset("warmupgroup1", "earliest"); + try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "warmupgroup1")) { + producer.send(record).get(15000, TimeUnit.MILLISECONDS); shareConsumer.subscribe(subscription); - alterShareAutoOffsetReset("warmupgroup1", "earliest"); TestUtils.waitForCondition( () -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 1, 30000, 200L, () -> "warmup record not received"); - } finally { - producer.close(); - shareConsumer.close(); } } From ec9eab83d86a93f6e5bf8a4d50889b0aec5d9647 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 15 Nov 2024 15:33:41 +0000 Subject: [PATCH 4/4] Improve waiting in tests --- .../kafka/test/api/ShareConsumerTest.java | 145 ++++++++++-------- 1 file changed, 77 insertions(+), 68 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index cd9d2c30f407..ed8fee0a01f2 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -84,13 +84,13 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -295,29 +295,26 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); shareConsumer.subscribe(Collections.singletonList(tp2.topic())); // Waiting for heartbeat to propagate the subscription change. - TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, - DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records from the updated subscription"); - - producer.send(record2).get(); - - //Starting the 3rd poll to invoke the callback - shareConsumer.poll(Duration.ofMillis(500)); + TestUtils.waitForCondition(() -> { + shareConsumer.poll(Duration.ofMillis(500)); + return partitionExceptionMap.containsKey(tp) && partitionExceptionMap.containsKey(tp2); + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records from the updated subscription"); - // Verifying if the callback was invoked for the partitions in the old subscription. - assertTrue(partitionExceptionMap.containsKey(tp)); + // Verifying if the callback was invoked without exceptions for the partitions for both topics. assertNull(partitionExceptionMap.get(tp)); + assertNull(partitionExceptionMap.get(tp2)); } } @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) { + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -332,16 +329,15 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgment error code after the second poll. - // When we start the 3rd poll, the acknowledgment commit callback is invoked. - shareConsumer.poll(Duration.ofMillis(1000)); - shareConsumer.poll(Duration.ofMillis(1000)); + TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, + DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); + + TestUtils.waitForCondition(() -> { + shareConsumer.poll(Duration.ofMillis(500)); + return partitionExceptionMap.containsKey(tp); + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback"); // We expect null exception as the acknowledgment error code is null. - assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); } } @@ -398,19 +394,10 @@ public void testAcknowledgementCommitCallbackInvalidRecordStateException(String // Waiting until the acquisition lock expires. Thread.sleep(20000); - // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgment error code after the second poll. - // When we start the 3rd poll, the acknowledgment commit callback is invoked. - records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - - records = shareConsumer.poll(Duration.ofMillis(200)); - assertEquals(0, records.count()); - - // As we tried to acknowledge a record after the acquisition lock expired, - // we wil get an InvalidRecordStateException. - assertTrue(partitionExceptionMap.containsKey(tp)); - assertInstanceOf(InvalidRecordStateException.class, partitionExceptionMap.get(tp)); + TestUtils.waitForCondition(() -> { + shareConsumer.poll(Duration.ofMillis(500)); + return partitionExceptionMap.containsKey(tp) && partitionExceptionMap.get(tp) instanceof InvalidRecordStateException; + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to be notified by InvalidRecordStateException"); } } @@ -496,8 +483,7 @@ public void testMaxPollRecords(String persister) { int maxPollRecords = 2; alterShareAutoOffsetReset("group1", "earliest"); - try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)))) { long startingTimestamp = System.currentTimeMillis(); @@ -659,9 +645,11 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte assertFalse(partitionExceptionMap1.containsKey(tp)); // The callback will receive the acknowledgement responses asynchronously after the next poll. - shareConsumer1.poll(Duration.ofMillis(500)); + TestUtils.waitForCondition(() -> { + shareConsumer1.poll(Duration.ofMillis(1000)); + return partitionExceptionMap1.containsKey(tp); + }, 30000, 100L, () -> "Didn't receive call to callback"); - assertTrue(partitionExceptionMap1.containsKey(tp)); assertNull(partitionExceptionMap1.get(tp)); } } @@ -1015,12 +1003,11 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) int messagesPerProducer = 5000; String groupId = "group1"; + alterShareAutoOffsetReset(groupId, "earliest"); ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount); - alterShareAutoOffsetReset(groupId, "earliest"); - for (int i = 0; i < producerCount; i++) { producerExecutorService.submit(() -> produceMessages(messagesPerProducer)); } @@ -1032,7 +1019,7 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) consumerExecutorService.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futures.add(future); - consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, future, maxBytes); }); } @@ -1110,17 +1097,17 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe shareGroupExecutorService1.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futures1.add(future); - consumeMessages(totalMessagesConsumedGroup1, totalMessagesSent, "group1", consumerNumber, 100, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumedGroup1, totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes); }); shareGroupExecutorService2.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futures2.add(future); - consumeMessages(totalMessagesConsumedGroup2, totalMessagesSent, "group2", consumerNumber, 100, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumedGroup2, totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes); }); shareGroupExecutorService3.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futures3.add(future); - consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent, "group3", consumerNumber, 100, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes); }); } producerExecutorService.shutdown(); @@ -1265,7 +1252,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers consumerExecutorService.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futuresSuccess.add(future); - consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, future, maxBytes); }); } producerExecutorService.shutdown(); @@ -1402,7 +1389,15 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist // Till now acknowledgement commit callback has not been called, so no exception thrown yet. // On 3rd poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + AtomicBoolean exceptionThrown = new AtomicBoolean(false); + TestUtils.waitForCondition(() -> { + try { + shareConsumer.poll(Duration.ofMillis(500)); + } catch (org.apache.kafka.common.errors.WakeupException e) { + exceptionThrown.set(true); + } + return exceptionThrown.get(); + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); } } @@ -1440,11 +1435,15 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) throw TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer"); - // The second poll sends the acknowledgments implicitly. - shareConsumer.poll(Duration.ofMillis(2000)); - - // On the third poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); + AtomicBoolean exceptionThrown = new AtomicBoolean(false); + TestUtils.waitForCondition(() -> { + try { + shareConsumer.poll(Duration.ofMillis(500)); + } catch (org.apache.kafka.common.errors.OutOfOrderSequenceException e) { + exceptionThrown.set(true); + } + return exceptionThrown.get(); + }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); } } @@ -1684,6 +1683,7 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { assertEquals(0, records.count()); // Producing another record. producer.send(record); + producer.flush(); records = shareConsumer.poll(Duration.ofMillis(5000)); // Now the next record should be consumed successfully assertEquals(1, records.count()); @@ -1708,6 +1708,7 @@ public void testShareAutoOffsetResetEarliest(String persister) { assertEquals(1, records.count()); // Producing another record. producer.send(record); + producer.flush(); records = shareConsumer.poll(Duration.ofMillis(5000)); // The next records should also be consumed successfully assertEquals(1, records.count()); @@ -1826,7 +1827,12 @@ private void consumeMessages(AtomicInteger totalMessagesConsumed, int maxPolls, boolean commit, CompletableFuture future) { - consumeMessages(totalMessagesConsumed, totalMessages, groupId, consumerNumber, maxPolls, commit, future, Optional.empty()); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId)) { + shareConsumer.subscribe(Collections.singleton(tp.topic())); + consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit, future); + } catch (Exception e) { + fail("Consumer " + consumerNumber + " failed with exception: " + e); + } } private void consumeMessages(AtomicInteger totalMessagesConsumed, @@ -1836,31 +1842,35 @@ private void consumeMessages(AtomicInteger totalMessagesConsumed, int maxPolls, boolean commit, CompletableFuture future, - Optional maxFetchBytes) { - KafkaShareConsumer shareConsumer; - Map> partitionOffsetsMap = new HashMap<>(); - Map partitionExceptionMap = new HashMap<>(); - if (maxFetchBytes.isPresent()) { - shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId, - Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxFetchBytes.get()))); - } else { - shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); - } - shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); - shareConsumer.subscribe(Collections.singleton(tp.topic())); + int maxFetchBytes) { + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId, Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes))) { + shareConsumer.subscribe(Collections.singleton(tp.topic())); + consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit, future); + } catch (Exception e) { + fail("Consumer " + consumerNumber + " failed with exception: " + e); + } + } + + private void consumeMessages(KafkaShareConsumer consumer, + AtomicInteger totalMessagesConsumed, + int totalMessages, + int consumerNumber, + int maxPolls, + boolean commit, + CompletableFuture future) { int messagesConsumed = 0; int retries = 0; try { if (totalMessages > 0) { while (totalMessagesConsumed.get() < totalMessages && retries < maxPolls) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(2000)); messagesConsumed += records.count(); totalMessagesConsumed.addAndGet(records.count()); retries++; } } else { while (retries < maxPolls) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(2000)); messagesConsumed += records.count(); totalMessagesConsumed.addAndGet(records.count()); retries++; @@ -1869,12 +1879,11 @@ private void consumeMessages(AtomicInteger totalMessagesConsumed, if (commit) { // Complete acknowledgement of the records - shareConsumer.commitSync(Duration.ofMillis(10000)); + consumer.commitSync(Duration.ofMillis(10000)); } } catch (Exception e) { fail("Consumer " + consumerNumber + " failed with exception: " + e); } finally { - shareConsumer.close(); future.complete(messagesConsumed); } }