diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 458738185355..2aa6e339d978 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -182,6 +182,9 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index c04afccd8aaf..206e6d04a2c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -39,6 +39,8 @@
* {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
*
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: config
set to interceptor.classes
, and class
set to the ConsumerInterceptor class name.
*/
public interface ConsumerInterceptor extends Configurable, AutoCloseable {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 7fda9a20c056..849b16f81323 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -303,12 +303,12 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List> interceptorList = configuredConsumerInterceptors(config);
- this.interceptors = new ConsumerInterceptors<>(interceptorList);
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
interceptorList,
- Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
+ Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer()));
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
final List addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
@@ -460,13 +460,13 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
- this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
this.time = time;
this.metrics = new Metrics(time);
+ this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.metadata = metadata;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
- this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 82a9bd2a53bf..e7ea5af09beb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -179,13 +179,13 @@ public class ClassicKafkaConsumer implements ConsumerDelegate {
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List> interceptorList = configuredConsumerInterceptors(config);
- this.interceptors = new ConsumerInterceptors<>(interceptorList);
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
- Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
+ Arrays.asList(this.deserializers.keyDeserializer(), this.deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
List addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
@@ -289,12 +289,12 @@ public class ClassicKafkaConsumer implements ConsumerDelegate {
this.metrics = new Metrics(time);
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
- this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
- this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
+ this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 2cba76588e5f..a505de2dc120 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -319,13 +319,13 @@ ConsumerRecord parseRecord(Deserializers deserializers,
K key;
V value;
try {
- key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+ key = keyBytes == null ? null : deserializers.keyDeserializer().deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Key Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
}
try {
- value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
+ value = valueBytes == null ? null : deserializers.valueDeserializer().deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
index c56ea1a03e97..c58b60ba0f25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
@@ -17,10 +17,13 @@
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,15 +38,15 @@
*/
public class ConsumerInterceptors implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
- private final List> interceptors;
+ private final List>> interceptorPlugins;
- public ConsumerInterceptors(List> interceptors) {
- this.interceptors = interceptors;
+ public ConsumerInterceptors(List> interceptors, Metrics metrics) {
+ this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG);
}
/** Returns true if no interceptors are defined. All other methods will be no-ops in this case. */
public boolean isEmpty() {
- return interceptors.isEmpty();
+ return interceptorPlugins.isEmpty();
}
/**
@@ -62,9 +65,9 @@ public boolean isEmpty() {
*/
public ConsumerRecords onConsume(ConsumerRecords records) {
ConsumerRecords interceptRecords = records;
- for (ConsumerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptRecords = interceptor.onConsume(interceptRecords);
+ interceptRecords = interceptorPlugin.get().onConsume(interceptRecords);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);
@@ -83,9 +86,9 @@ public ConsumerRecords onConsume(ConsumerRecords records) {
* @param offsets A map of offsets by partition with associated metadata
*/
public void onCommit(Map offsets) {
- for (ConsumerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptor.onCommit(offsets);
+ interceptorPlugin.get().onCommit(offsets);
} catch (Exception e) {
// do not propagate interceptor exception, just log
log.warn("Error executing interceptor onCommit callback", e);
@@ -98,9 +101,9 @@ public void onCommit(Map offsets) {
*/
@Override
public void close() {
- for (ConsumerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptor.close();
+ interceptorPlugin.close();
} catch (Exception e) {
log.error("Failed to close consumer interceptor ", e);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
index 5de2a888775a..0926c720c0c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
@@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
@@ -28,44 +30,54 @@
public class Deserializers implements AutoCloseable {
- public final Deserializer keyDeserializer;
- public final Deserializer valueDeserializer;
+ private final Plugin> keyDeserializerPlugin;
+ private final Plugin> valueDeserializerPlugin;
- public Deserializers(Deserializer keyDeserializer, Deserializer valueDeserializer) {
- this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Key deserializer provided to Deserializers should not be null");
- this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Value deserializer provided to Deserializers should not be null");
- }
-
- public Deserializers(ConsumerConfig config) {
- this(config, null, null);
+ public Deserializers(Deserializer keyDeserializer, Deserializer valueDeserializer, Metrics metrics) {
+ this.keyDeserializerPlugin = Plugin.wrapInstance(
+ Objects.requireNonNull(keyDeserializer, "Key deserializer provided to Deserializers should not be null"),
+ metrics,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ this.valueDeserializerPlugin = Plugin.wrapInstance(
+ Objects.requireNonNull(valueDeserializer, "Value deserializer provided to Deserializers should not be null"),
+ metrics,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
@SuppressWarnings("unchecked")
- public Deserializers(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {
+ public Deserializers(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer, Metrics metrics) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (keyDeserializer == null) {
- this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
+ keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- this.keyDeserializer = keyDeserializer;
}
+ this.keyDeserializerPlugin = Plugin.wrapInstance(keyDeserializer, metrics, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
if (valueDeserializer == null) {
- this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
+ valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- this.valueDeserializer = valueDeserializer;
}
+ this.valueDeserializerPlugin = Plugin.wrapInstance(valueDeserializer, metrics, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ public Deserializer keyDeserializer() {
+ return keyDeserializerPlugin.get();
+ }
+
+ public Deserializer valueDeserializer() {
+ return valueDeserializerPlugin.get();
}
@Override
public void close() {
AtomicReference firstException = new AtomicReference<>();
- Utils.closeQuietly(keyDeserializer, "key deserializer", firstException);
- Utils.closeQuietly(valueDeserializer, "value deserializer", firstException);
+ Utils.closeQuietly(keyDeserializerPlugin, "key deserializer", firstException);
+ Utils.closeQuietly(valueDeserializerPlugin, "value deserializer", firstException);
Throwable exception = firstException.get();
if (exception != null) {
@@ -79,8 +91,8 @@ public void close() {
@Override
public String toString() {
return "Deserializers{" +
- "keyDeserializer=" + keyDeserializer +
- ", valueDeserializer=" + valueDeserializer +
+ "keyDeserializer=" + keyDeserializerPlugin.get() +
+ ", valueDeserializer=" + valueDeserializerPlugin.get() +
'}';
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 74760beec6d7..838416b8428b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -296,13 +296,13 @@ ConsumerRecord parseRecord(final Deserializers deserializers,
K key;
V value;
try {
- key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+ key = keyBytes == null ? null : deserializers.keyDeserializer().deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
log.error("Key Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
}
try {
- value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
+ value = valueBytes == null ? null : deserializers.valueDeserializer().deserialize(partition.topic(), headers, valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index e209ec00b0d1..82d92e125cc6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -253,12 +253,12 @@ private enum AcknowledgementMode {
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
- Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
+ Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
final List addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
@@ -355,7 +355,7 @@ private enum AcknowledgementMode {
this.time = time;
this.metrics = new Metrics(time);
this.clientTelemetryReporter = Optional.empty();
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.subscriptions = subscriptions;
this.metadata = metadata;
@@ -451,7 +451,7 @@ private enum AcknowledgementMode {
this.metrics = metrics;
this.metadata = metadata;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
- this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.applicationEventHandler = applicationEventHandler;
this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 65d6a1e99036..f5042867aaf6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -57,6 +57,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -247,7 +248,7 @@ public class KafkaProducer implements Producer {
// Visible for testing
final Metrics metrics;
private final KafkaProducerMetrics producerMetrics;
- private final Partitioner partitioner;
+ private final Plugin partitionerPlugin;
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata;
@@ -257,8 +258,8 @@ public class KafkaProducer implements Producer {
private final Compression compression;
private final Sensor errors;
private final Time time;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
+ private final Plugin> keySerializerPlugin;
+ private final Plugin> valueSerializerPlugin;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final boolean partitionerIgnoreKeys;
@@ -332,11 +333,11 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali
@SuppressWarnings("deprecation")
private void warnIfPartitionerDeprecated() {
// Using DefaultPartitioner and UniformStickyPartitioner is deprecated, see KIP-794.
- if (partitioner instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) {
+ if (partitionerPlugin.get() instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) {
log.warn("DefaultPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting to get the default partitioning behavior");
}
- if (partitioner instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) {
+ if (partitionerPlugin.get() instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) {
log.warn("UniformStickyPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting and set " + ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG
+ " to 'true' to get the uniform sticky partitioning behavior");
@@ -380,30 +381,33 @@ private void warnIfPartitionerDeprecated() {
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.producerMetrics = new KafkaProducerMetrics(metrics);
- this.partitioner = config.getConfiguredInstance(
- ProducerConfig.PARTITIONER_CLASS_CONFIG,
- Partitioner.class,
- Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
+ this.partitionerPlugin = Plugin.wrapInstance(
+ config.getConfiguredInstance(
+ ProducerConfig.PARTITIONER_CLASS_CONFIG,
+ Partitioner.class,
+ Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
+ metrics,
+ ProducerConfig.PARTITIONER_CLASS_CONFIG);
warnIfPartitionerDeprecated();
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
if (keySerializer == null) {
- this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
- this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
+ keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- this.keySerializer = keySerializer;
}
+ this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+
if (valueSerializer == null) {
- this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
- this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
+ valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- this.valueSerializer = valueSerializer;
}
+ this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
List> interceptorList = ClientUtils.configuredInterceptors(config,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
@@ -411,11 +415,11 @@ private void warnIfPartitionerDeprecated() {
if (interceptors != null)
this.interceptors = interceptors;
else
- this.interceptors = new ProducerInterceptors<>(interceptorList);
+ this.interceptors = new ProducerInterceptors<>(interceptorList, metrics);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
interceptorList,
reporters,
- Arrays.asList(this.keySerializer, this.valueSerializer));
+ Arrays.asList(this.keySerializerPlugin.get(), this.valueSerializerPlugin.get()));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compression = configureCompression(config);
@@ -426,7 +430,7 @@ private void warnIfPartitionerDeprecated() {
this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
- boolean enableAdaptivePartitioning = partitioner == null &&
+ boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
@@ -500,9 +504,9 @@ private void warnIfPartitionerDeprecated() {
this.log = logContext.logger(KafkaProducer.class);
this.metrics = metrics;
this.producerMetrics = new KafkaProducerMetrics(metrics);
- this.partitioner = partitioner;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
+ this.partitionerPlugin = Plugin.wrapInstance(partitioner, metrics, ProducerConfig.PARTITIONER_CLASS_CONFIG);
+ this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.interceptors = interceptors;
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
@@ -1007,8 +1011,8 @@ private void throwIfProducerClosed() {
*/
@SuppressWarnings("deprecation")
private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
- assert partitioner != null;
- partitioner.onNewBatch(topic, cluster, prevPartition);
+ assert partitionerPlugin.get() != null;
+ partitionerPlugin.get().onNewBatch(topic, cluster, prevPartition);
}
/**
@@ -1037,7 +1041,7 @@ private Future doSend(ProducerRecord record, Callback call
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
+ serializedKey = keySerializerPlugin.get().serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
@@ -1045,7 +1049,7 @@ private Future doSend(ProducerRecord record, Callback call
}
byte[] serializedValue;
try {
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
+ serializedValue = valueSerializerPlugin.get().serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
@@ -1066,7 +1070,7 @@ private Future doSend(ProducerRecord record, Callback call
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
// A custom partitioner may take advantage on the onNewBatch callback.
- boolean abortOnNewBatch = partitioner != null;
+ boolean abortOnNewBatch = partitionerPlugin.get() != null;
// Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition.
@@ -1480,9 +1484,9 @@ private void close(Duration timeout, boolean swallowException) {
Utils.closeQuietly(interceptors, "producer interceptors", firstException);
Utils.closeQuietly(producerMetrics, "producer metrics wrapper", firstException);
Utils.closeQuietly(metrics, "producer metrics", firstException);
- Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
- Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
- Utils.closeQuietly(partitioner, "producer partitioner", firstException);
+ Utils.closeQuietly(keySerializerPlugin, "producer keySerializer", firstException);
+ Utils.closeQuietly(valueSerializerPlugin, "producer valueSerializer", firstException);
+ Utils.closeQuietly(partitionerPlugin, "producer partitioner", firstException);
clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
Throwable exception = firstException.get();
@@ -1509,8 +1513,8 @@ private int partition(ProducerRecord record, byte[] serializedKey, byte[]
if (record.partition() != null)
return record.partition();
- if (partitioner != null) {
- int customPartition = partitioner.partition(
+ if (partitionerPlugin.get() != null) {
+ int customPartition = partitionerPlugin.get().partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index 3db3c3a31eb7..073c1d973bf9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -23,6 +23,9 @@
/**
* Partitioner Interface
+ *
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the partitioner to register metrics. The following tags are automatically added to
+ * all metrics registered: config
set to partitioner.class
, and class
set to the Partitioner class name.
*/
public interface Partitioner extends Configurable, Closeable {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index 48caf98d44a3..5bc4b2c2c852 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -33,6 +33,8 @@
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
*
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: config
set to interceptor.classes
, and class
set to the ProducerInterceptor class name.
*/
public interface ProducerInterceptor extends Configurable, AutoCloseable {
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 75bf8485e473..9936eef76094 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -17,10 +17,13 @@
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
@@ -35,10 +38,10 @@
*/
public class ProducerInterceptors implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
- private final List> interceptors;
+ private final List>> interceptorPlugins;
- public ProducerInterceptors(List> interceptors) {
- this.interceptors = interceptors;
+ public ProducerInterceptors(List> interceptors, Metrics metrics) {
+ this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);
}
/**
@@ -57,9 +60,9 @@ public ProducerInterceptors(List> interceptors) {
*/
public ProducerRecord onSend(ProducerRecord record) {
ProducerRecord interceptRecord = record;
- for (ProducerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptRecord = interceptor.onSend(interceptRecord);
+ interceptRecord = interceptorPlugin.get().onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
@@ -84,9 +87,9 @@ public ProducerRecord onSend(ProducerRecord record) {
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- for (ProducerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptor.onAcknowledgement(metadata, exception);
+ interceptorPlugin.get().onAcknowledgement(metadata, exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
@@ -105,15 +108,15 @@ public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
* @param exception The exception thrown during processing of this record.
*/
public void onSendError(ProducerRecord record, TopicPartition interceptTopicPartition, Exception exception) {
- for (ProducerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
if (record == null && interceptTopicPartition == null) {
- interceptor.onAcknowledgement(null, exception);
+ interceptorPlugin.get().onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = extractTopicPartition(record);
}
- interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
+ interceptorPlugin.get().onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
}
} catch (Exception e) {
@@ -132,9 +135,9 @@ public static TopicPartition extractTopicPartition(ProducerRecord r
*/
@Override
public void close() {
- for (ProducerInterceptor interceptor : this.interceptors) {
+ for (Plugin> interceptorPlugin : this.interceptorPlugins) {
try {
- interceptor.close();
+ interceptorPlugin.close();
} catch (Exception e) {
log.error("Failed to close producer interceptor ", e);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
new file mode 100644
index 000000000000..8aba8059b259
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class Plugin implements Supplier, AutoCloseable {
+
+ private final T instance;
+ private final Optional pluginMetrics;
+
+ private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
+ this.instance = instance;
+ this.pluginMetrics = Optional.ofNullable(pluginMetrics);
+ }
+
+ public static Plugin wrapInstance(T instance, Metrics metrics, String key) {
+ return wrapInstance(instance, metrics, () -> tags(key, instance));
+ }
+
+ private static Map tags(String key, T instance) {
+ Map tags = new LinkedHashMap<>();
+ tags.put("config", key);
+ tags.put("class", instance.getClass().getSimpleName());
+ return tags;
+ }
+
+ public static List> wrapInstances(List instances, Metrics metrics, String key) {
+ List> plugins = new ArrayList<>();
+ for (T instance : instances) {
+ plugins.add(wrapInstance(instance, metrics, key));
+ }
+ return plugins;
+ }
+
+ public static Plugin wrapInstance(T instance, Metrics metrics, Supplier