= Kafka Component
:doctitle: Kafka
:shortname: kafka
:artifactid: camel-kafka
:description: Send and receive messages to/from an Apache Kafka broker.
:since: 2.13
:supportlevel: Stable
:tabs-sync-option:
:component-header: Both producer and consumer are supported
//Manually maintained attributes
:camel-spring-boot-name: kafka
*Since Camel {since}*
*{component-header}*
The Kafka component is used for communicating with
http://kafka.apache.org/[Apache Kafka] message broker.
Maven users will need to add the following dependency to their `pom.xml`
for this component.
[source,xml]
------------------------------------------------------------
org.apache.camel
camel-kafka
x.x.x
------------------------------------------------------------
== URI format
---------------------------
kafka:topic[?options]
---------------------------
// component options: START
include::partial$component-configure-options.adoc[]
include::partial$component-endpoint-options.adoc[]
include::partial$component-endpoint-headers.adoc[]
// component options: END
For more information about Producer/Consumer configuration:
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs]
If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not sent along the message, and actually is removed in the producer.
== Usage
=== Consumer error handling
While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what
you can configure.
The consumer may throw exception when invoking the Kafka `poll` API. For example, if the message cannot be deserialized due to invalid data,
and many other kinds of errors. Those errors are in the form of `KafkaException` which are either _retriable_ or not. The exceptions
which can be retried (`RetriableException`) will be retried again (with a poll timeout in between). All other kinds of exceptions are
handled according to the _pollOnError_ configuration. This configuration has the following values:
* DISCARD will discard the message and continue to poll the next message.
* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll the next message.
* RECONNECT will re-connect the consumer and try to poll the message again.
* RETRY will let the consumer retry polling the same message again
* STOP will stop the consumer (it has to be manually started/restarted if the consumer should be able to consume messages again).
The default is *ERROR_HANDLER*, which will let Camel's error handler (if any configured) process the caused exception.
Afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that
Camel components have.
For advanced control a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured
on the component level, which allows controlling which of the strategies to use for each exception.
=== Consumer error handling (advanced)
By default, Camel will poll using the *ERROR_HANDLER* to process exceptions.
How Camel handles a message that results in an exception can be altered using the `breakOnFirstError` attribute in the configuration.
Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried.
This is similar to the *RETRY* polling strategy above.
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
kafka.setBreakOnFirstError(true);
...
camelContext.addComponent("kafka", kafka);
----
It is recommended that you read the section below "Using manual commit with Kafka consumer" to understand how `breakOnFirstError`
will work based on the `CommitManager` that is configured.
=== The Kafka idempotent repository
The `camel-kafka` library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions, as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.
Each repository instance that uses the topic, (e.g., typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic, each will control its own offset.
On startup, the instance subscribes to the topic, rewinds the offset to the beginning and rebuilds the cache to the latest state. The cache will not be considered warmed up until one poll of `pollDurationMs` in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens, the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
Be mindful of the format of the header used for the uniqueness check. By default, it uses Strings as the data types. When using primitive numeric formats, the header must be deserialized accordingly. Check the samples below for examples.
A `KafkaIdempotentRepository` has the following properties:
[width="100%",cols="2m,2m,5",options="header"]
|===
| Property | Default | Description
| topic | | *Required* The name of the Kafka topic to use to broadcast changes. (required)
| bootstrapServers | | *Required* The `bootstrap.servers` property on the internal Kafka producer and consumer. Use this as shorthand if not setting `consumerConfig` and `producerConfig`. If used, this component will apply sensible default configurations for the producer and consumer.
| groupId | | The groupId to assign to the idempotent consumer.
| startupOnly | false | Whether to sync on startup only, or to continue syncing while Camel is running.
| maxCacheSize | 1000 | How many of the most recently used keys should be stored in memory (default 1000).
| pollDurationMs | 100 | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms.
If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will operate in an inconsistent state relative to its peers until it catches up.
| producerConfig | | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself
| consumerConfig | | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself
|===
The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is `CamelContext` aware.
Sample usage is as follows:
[source,java]
----
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
// once-only insert into the database
.end()
----
In XML:
[source,xml]
----
localhost:9091
localhost:9091
----
There are 3 alternatives to choose from when using idempotency with numeric identifiers. The first one is to use the static method `numericHeader` method from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the conversion for you:
[source,java]
----
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo")
// once-only insert into the database
.end()
----
Alternatively, it is possible to use a custom serializer configured via the route URL to perform the conversion:
[source,java]
----
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
----
Lastly, it is also possible to do so in a processor:
[source,java]
----
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.idempotentRepository("kafkaIdempotentRepository")
.to(to);
----
=== Manual commits with the Kafka consumer
By default, the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.
In case you want to force manual commits, you can use `KafkaManualCommit` API from the Camel Exchange, stored on the message header.
This requires turning on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent`
or on the endpoint, for example:
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
// ...
camelContext.addComponent("kafka", kafka);
----
By default, it uses the `NoopCommitManager` behind the scenes. To commit an offset, you will
require you to use the `KafkaManualCommit` from Java code such as a Camel `Processor`:
[source,java]
----
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
----
The `KafkaManualCommit` will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown.
You can use an asynchronous commit as well by configuring the `KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` implementation.
Then the commit will be done in the next consumer loop using the kafka asynchronous commit api.
If you want to use a custom implementation of `KafkaManualCommit` then you can configure a custom `KafkaManualCommitFactory`
on the `KafkaComponent` that creates instances of your custom implementation.
When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior
of `breakOnFirstError`
[source,java]
----
KafkaComponent kafka = new KafkaComponent();
kafka.setAutoCommitEnable(false);
kafka.setAllowManualCommit(true);
kafka.setBreakOnFirstError(true);
kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory());
...
camelContext.addComponent("kafka", kafka);
----
When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the
message with an error is retried. The consumer must manage that in the route using `KafkaManualCommit`.
When the `CommitManager` is changed to either the synchronous or asynchronous manager then `breakOnFirstError` will automatically commit the offset so that the
message with an error is retried. This message will be continually retried until it can be processed without an error.
*Note 1*: records from a partition must be processed and committed by the same thread as the consumer. This means that certain EIPs, async or concurrent operations
in the DSL may cause the commit to fail. In such circumstances, trying to commit the transaction will cause the Kafka client to throw a `java.util.ConcurrentModificationException`
exception with the message `KafkaConsumer is not safe for multi-threaded access`. To prevent this from happening, redesign your route to avoid those operations.
*Note 2: this is mostly useful with aggregation's completion timeout strategies.
=== Pausable Consumers
The Kafka component supports pausable consumers. This type of consumer can pause consuming data based on
conditions external to the component itself, such as an external system being unavailable or other transient conditions.
[source,java]
----
from("kafka:topic")
.pausable(new KafkaConsumerListener(), () -> canContinue()) // the pausable check gets called if the exchange fails to be processed ...
.routeId("pausable-route")
.process(this::process) // Kafka consumer will be paused if this one throws an exception ...
.to("some:destination"); // or this one
----
In this example, consuming messages can pause (by calling the Kafka's Consumer pause method) if the result from `canContinue` is false.
IMPORTANT: The pausable EIP is meant to be used as a support mechanism when *there is an exception* somewhere in the route that prevents the exchange from being processed. More specifically,
the check called by the `pausable` EIP should be used to test for transient conditions preventing the exchange from being processed.
NOTE: most users should prefer using the xref:manual::route-policy.adoc[RoutePolicy], which offers better control of the route.
=== Kafka Headers propagation
When consuming messages from Kafka, headers will be propagated to camel exchange headers automatically.
Producing flow backed by same behaviour - camel headers of particular exchange will be propagated to kafka message headers.
Since kafka headers allow only `byte[]` values, in order camel exchange header to be propagated its value should be serialized to `bytes[]`,
otherwise header will be skipped.
The following header value types are supported: `String`, `Integer`, `Long`, `Double`, `Boolean`, `byte[]`.
Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default.
To override default functionality, these uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. For example:
[source,java]
----
from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")
----
By default, all headers are being filtered by `KafkaHeaderFilterStrategy`.
Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes.
Default strategy can be overridden by using `headerFilterStrategy` uri parameter in both `to` and `from` routes:
[source,java]
----
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
----
`myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is `CamelContext` aware.
=== Kafka Transaction
You need to add `transactionalId=` or `transacted=true` to enable kafka transaction with the producer.
[source,java]
----
from("direct:transaction")
.to("kafka:my_topic?transacted=true");
----
At the end of exchange routing, the kafka producer would commit the transaction or abort it if there is an Exception throwing or the exchange is `RollbackOnly`. Since Kafka does not support transactions in multi threads, it will throw `ProducerFencedException` if there is another producer with the same `transaction.id` to make the transactional request.
It would work with JTA `camel-jta` by using `transacted()` and if it involves some resources (SQL or JMS), which supports XA, then they would work in tandem, where they both will either commit or rollback at the end of the exchange routing. In some cases, if the JTA transaction manager fails to commit (during the 2PC processing), but kafka transaction has been committed before and there is no chance to roll back the changes since the kafka transaction does not support JTA/XA spec. There is still a risk with the data consistency.
When you use the `transacted=true` parameter, the `transactionalId` is made from the endpoint id and route id, but if you want finer control you can just use the `transactionalId` parameter, however remeber to have a `transactionalId` for each kafka producer endpoint. Note that the old style of using `additionalProperties[transactional.id]` is still valid.
If both `transacted=true` and `transactionalId` are present, the latter takes precedence.
=== Setting Kerberos config file
Configure the 'krb5.conf' file directly through the API:
[source,java]
----
static {
KafkaComponent.setKerberosConfigLocation("path/to/config/file");
}
----
=== Authentication to Kafka
Kafka supports several ways to authenticate the clients to the server, including plain text, PKI (certificates) over TLS, you can refer to the https://kafka.apache.org/documentation/#security_sasl[Kafka documentation] for a detailed view of the supported mechanisms. The kafka authentication and authorization is based on JAAS, so you must use a JAAS Login Module implementation on the client side.
==== Simplified Authentication Configuration
Instead of manually constructing JAAS configuration strings, you can use the `saslAuthType` option along with dedicated credential properties. This approach automatically derives the correct `securityProtocol`, `saslMechanism`, and `saslJaasConfig` values based on the selected authentication type.
The following authentication types are supported:
[width="100%",cols="2m,5",options="header"]
|===
| saslAuthType | Description
| NONE | No authentication (insecure, for development only)
| PLAIN | SASL/PLAIN with username and password
| SCRAM_SHA_256 | SASL/SCRAM-SHA-256 authentication
| SCRAM_SHA_512 | SASL/SCRAM-SHA-512 authentication
| SSL | SSL/TLS with client certificates (mTLS)
| OAUTH | OAuth 2.0 / OIDC authentication
| AWS_MSK_IAM | AWS MSK IAM authentication
| KERBEROS | Kerberos/GSSAPI authentication
|===
*SCRAM Authentication Example*
[source,java]
----
from("kafka:my-topic?brokers=localhost:9092&saslAuthType=SCRAM_SHA_512&saslUsername=myuser&saslPassword=mypassword")
.to("log:received");
----
Or using Spring Boot configuration:
[source,properties]
----
camel.component.kafka.sasl-auth-type=SCRAM_SHA_512
camel.component.kafka.sasl-username=myuser
camel.component.kafka.sasl-password=mypassword
----
*OAuth Authentication Example*
[source,java]
----
from("kafka:my-topic?brokers=localhost:9092" +
"&saslAuthType=OAUTH" +
"&oauthClientId=my-client" +
"&oauthClientSecret=my-secret" +
"&oauthTokenEndpointUri=https://auth.example.com/oauth/token")
.to("log:received");
----
*AWS MSK IAM Authentication*
When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth` library is on the classpath:
[source,java]
----
from("kafka:my-topic?brokers=b-1.mycluster.kafka.us-east-1.amazonaws.com:9098&saslAuthType=AWS_MSK_IAM")
.to("log:received");
----
The simplified configuration is optional. If you need more control or are using a custom JAAS login module, you can still use the traditional approach with explicit `securityProtocol`, `saslMechanism`, and `saslJaasConfig` properties as shown below.
==== Traditional JAAS Configuration
This section will outline the main points for the authentication using https://github.com/strimzi/strimzi-kafka-oauth/blob/main/README.md[Strimzi JAAS Login Module] and plain text. The Strimzi OAuth contains several properties to fine tune, the authentication mechanism, so you can set them into the `OAuthBearerLoginModule` section.
The most basic way to authenticate is using the plain text login module with username and password. Beware that this is unsafe and and we suggest the OAuth over TLS for a fully secure mechanism.
*Username and Password over TLS*
[source]
----
camel.component.kafka.security-protocol = SASL_SSL
camel.component.kafka.sasl-mechanism=PLAIN
camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="my_username" \
password="my_password";
----
There is the Strimzi OAuth Login Module that supports the more secure OAuth mechanisms, where you can set a refresh token, username/password and client secret. You must understand the Kafka Broker security settings to adequately configure the client security configuration.
*OAuth Bearer Token with client secret*
[source]
----
camel.component.kafka.security-protocol = SASL_PLAINTEXT
camel.component.kafka.sasl-mechanism = OAUTHBEARER
camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.client.id="kafka-producer-client" \
oauth.client.secret="kafka-producer-client-secret" \
oauth.username.claim="preferred_username" \
oauth.ssl.truststore.location="docker/certificates/ca-truststore.p12" \
oauth.ssl.truststore.type="pkcs12" \
oauth.ssl.truststore.password="changeit" \
oauth.token.endpoint.uri="https://keycloak:8443/realms/demo/protocol/openid-connect/token" ;
camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
----
*OAuth Bearer Token with refresh token*
[source]
----
camel.component.kafka.security-protocol = SASL_PLAINTEXT
camel.component.kafka.sasl-mechanism = OAUTHBEARER
camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.client.id="kafka-producer-client" \
oauth.refresh.token="my_refresh_token"
oauth.username.claim="preferred_username" \
oauth.ssl.truststore.location="docker/certificates/ca-truststore.p12" \
oauth.ssl.truststore.type="pkcs12" \
oauth.ssl.truststore.password="changeit" \
oauth.token.endpoint.uri="https://keycloak:8443/realms/demo/protocol/openid-connect/token" ;
camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
----
=== Batching Consumer
To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true`.
The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually
every record or the whole batch at once by committing the last exchange on the list.
The size of the batch is controlled by the option `maxPollRecords`.
Camel will wait until the number of records has been received per batch size (`maxPollRecords`).
However, this can take a long time, if there are too few records and no new records is being received.
To avoid blocking for too long, waiting for the whole set of records to fill the batch, it is possible to use the `pollTimeoutMs` option
to set a timeout for the polling. The timeout is only triggered if there has not received any new messages for the given timeout period.
So for example if `pollTimeoutMs=10000` then the timeout is 10 seconds, and this will only trigger if Camel did not receive any new messages
from Kafka for 10 seconds. The timeout trigger will reset if a message has been received, so if you continuously and slowly receive new messages,
then this timeout may not trigger for a long time. Therefore, the option `batchingIntervalMs` can therefore be used to specify an interval (in mills)
to trigger the batch completion, to avoid waiting for more messages to be received to reach the batch size.
For example setting `batchingIntervalMs=20000` would let Camel wait at most `pollTimeoutMs + batchingIntervalMs` before
triggering a batch completion. In this example that would be 30 seconds.
Notice the `pollTimeoutMs` should not be set to a high value, as it's used directly by Kafka while
receiving new messages from the broker. Camel is not active during this processing, and if the option
has been configured with a high value, then Camel cannot trigger batch timeout or interval completion ahead
of time. Therefore, it's recommended to keep this value as default.
==== Automatic Commits
By default, Camel uses automatic commits when using batch processing. In this case, Camel automatically commits the records after they have been successfully processed by the application.
In case of failures, the records will not be processed.
The code below provides an example of this approach:
[source,java]
----
public void configure() {
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
for (Object obj : exchanges) {
if (obj instanceof Exchange exchange) {
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
----
===== Handling Errors with Automatic Commits
When using automatic commits, Camel will not commit records if there is a failure in processing. Because of this, there is a risk that records could be reprocessed multiple times.
It is recommended to implement appropriate error handling mechanisms and patterns (i.e.; such as dead-letter queues), to prevent failed records from blocking processing progress.
The code below provides an example of handling errors with automatic commits:
[source,java]
----
public void configure() {
/*
We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
production scenario, applications should probably send these records to a separate topic or fix the condition
that lead to the failure
*/
onException(IllegalArgumentException.class).process(exchange -> {
LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
}).continued(true);
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange.
int i = 0;
for (Object o : exchanges) {
if (o instanceof Exchange exchange) {
i++;
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
if (i == 4) {
throw new IllegalArgumentException("Failed to process record");
}
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
----
===== Break on First Error in Batching Mode
The `breakOnFirstError` option is also supported in batching mode, providing the same error handling behavior as in streaming mode but applied to batch processing.
When `breakOnFirstError=true` is configured with batching enabled:
* **Auto Commit Mode**: When an error occurs during batch processing, Camel will commit the current batch position and trigger a consumer reconnection. This allows the problematic batch to be retried on the next polling cycle.
* **Manual Commit Mode**: When an error occurs, the route's error handler is responsible for performing the manual commit (if desired) before the consumer reconnects.
* **Reconnection**: After an error triggers `breakOnFirstError`, the consumer automatically reconnects and resumes polling from the committed position.
The behavior is consistent with streaming mode, ensuring that error handling patterns work similarly across both processing modes.
Example with automatic commits:
[tabs]
====
Java DSL::
+
[source,java]
----
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&breakOnFirstError=true&autoCommitEnable=true")
.process(e -> {
final List> exchanges = e.getMessage().getBody(List.class);
// Process batch - any exception will trigger breakOnFirstError behavior
for (Object exchange : exchanges) {
// ... processing logic
}
}).to("mock:result");
----
YAML::
+
[source,yaml]
----
- route:
from:
uri: "kafka:topic"
parameters:
groupId: "myGroup"
pollTimeoutMs: 1000
batching: true
maxPollRecords: 10
breakOnFirstError: true
autoCommitEnable: true
steps:
- process:
ref: "batchProcessor"
- to: "mock:result"
----
Processor Implementation::
+
[source,java]
----
@Component("batchProcessor")
public class BatchProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
final List> exchanges = exchange.getMessage().getBody(List.class);
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// Process each exchange in the batch
for (Object obj : exchanges) {
if (obj instanceof Exchange batchExchange) {
String body = batchExchange.getMessage().getBody(String.class);
LOG.info("Processing exchange with body: {}", body);
// Your business logic here
// Any exception thrown will trigger breakOnFirstError behavior
processMessage(body);
}
}
}
private void processMessage(String message) {
// Implement your message processing logic
// Throw exceptions for error conditions that should trigger breakOnFirstError
if (message.contains("error")) {
throw new RuntimeException("Processing failed for message: " + message);
}
}
}
----
====
Example with manual commits:
[tabs]
====
Java DSL::
+
[source,java]
----
// Error handler that performs manual commit before reconnection
onException(Exception.class)
.handled(false)
.process(exchange -> {
// Commit manually when error occurs
KafkaManualCommit manual = exchange.getMessage()
.getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
});
from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommitEnable=false&allowManualCommit=true")
.process(e -> {
final List> exchanges = e.getMessage().getBody(List.class);
// Process batch - errors will be handled by the error handler above
for (Object exchange : exchanges) {
// ... processing logic
}
})
.process(exchange -> {
// Manual commit on successful processing
KafkaManualCommit manual = exchange.getMessage()
.getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
})
.to("mock:result");
----
YAML::
+
[source,yaml]
----
- route:
on-exception:
- exception: "java.lang.Exception"
handled: false
steps:
- process:
ref: "errorCommitProcessor"
from:
uri: "kafka:topic"
parameters:
groupId: "myGroup"
batching: true
breakOnFirstError: true
autoCommitEnable: false
allowManualCommit: true
steps:
- process:
ref: "batchProcessorManual"
- process:
ref: "successCommitProcessor"
- to: "mock:result"
----
Processor Implementations::
+
[source,java]
----
@Component("batchProcessorManual")
public class BatchProcessorManual implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(BatchProcessorManual.class);
@Override
public void process(Exchange exchange) throws Exception {
final List> exchanges = exchange.getMessage().getBody(List.class);
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// Process each exchange in the batch
for (Object obj : exchanges) {
if (obj instanceof Exchange batchExchange) {
String body = batchExchange.getMessage().getBody(String.class);
LOG.info("Processing exchange with body: {}", body);
// Your business logic here
// Errors will be handled by the error handler
processMessage(body);
}
}
}
private void processMessage(String message) {
// Implement your message processing logic
if (message.contains("error")) {
throw new RuntimeException("Processing failed for message: " + message);
}
}
}
@Component("errorCommitProcessor")
public class ErrorCommitProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(ErrorCommitProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
LOG.warn("Error occurred, performing manual commit before reconnection");
KafkaManualCommit manual = exchange.getMessage()
.getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit();
LOG.info("Manual commit completed after error");
}
}
}
@Component("successCommitProcessor")
public class SuccessCommitProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(SuccessCommitProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
LOG.debug("Batch processed successfully, performing manual commit");
KafkaManualCommit manual = exchange.getMessage()
.getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
manual.commit();
LOG.debug("Manual commit completed after successful processing");
}
}
}
----
====
==== Manual Commits
When working with batch processing with manual commits, it's up to the application to commit the records, and handle the outcome of potentially invalid records.
The code below provides an example of this approach:
[source,java]
----
public void configure() {
from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
.process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
/*
Every exchange in that list should contain a reference to the manual commit object. We use the reference
for the last exchange in the list to commit the whole batch
*/
final Object tmp = exchanges.getLast();
if (tmp instanceof Exchange exchange) {
KafkaManualCommit manual =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
LOG.debug("Performing manual commit");
manual.commit();
LOG.debug("Done performing manual commit");
}
});
}
----
==== Dealing with long polling timeouts
In some cases, applications may want the polling process to have a long timeout (see: `pollTimeoutMs`).
To properly do so, first make sure to have a max polling interval that is higher than the polling timeout (see: `maxPollIntervalMs`).
Then, increase the shutdown timeout to ensure that committing, closing and other Kafka operations are not abruptly aborted. For instance:
[source,java]
----
public void configure() {
// Note that this can be configured in other ways
getCamelContext().getShutdownStrategy().setTimeout(10000);
// route setup ...
}
----
=== Custom Subscription Adapters
Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is
necessary to implement the interface `SubscribeAdapter`.
[source,java]
.Example subscriber adapter that subscribes to a set of Kafka topics or patterns
----
public class CustomSubscribeAdapter implements SubscribeAdapter {
@Override
public void subscribe(Consumer, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) {
if (topicInfo.isPattern()) {
consumer.subscribe(topicInfo.getPattern(), reBalanceListener);
} else {
consumer.subscribe(topicInfo.getTopics(), reBalanceListener);
}
}
}
----
Then, it is necessary to add it as named bean instance to the registry:
[source,java]
.Add to registry example
----
context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter());
----
=== Interoperability
==== JMS
When interoperating Kafka and JMS, it may be necessary to coerce the JMS headers into their expected type.
For instance, when consuming messages from Kafka carrying JMS headers and then sending them to a JMS broker, those headers are
first deserialized into a byte array. Then, the `camel-jms` component tries to coerce this byte array into the
specific type used by.
However, both the origin endpoint as well as how this was setup on the code itself may affect how the data is serialized and
deserialized. As such, it is not feasible to naively assume the data type of the byte array.
To address this issue, we provide a custom header deserializer to force Kafka to de-serialize the JMS data according to
the JMS specification. This approach ensures that the headers are properly interpreted and processed by the camel-jms component.
Due to the inherent complexity of each possible system and endpoint, it may not be possible for this deserializer to cover all
possible scenarios. As such, it is provided as model that can be modified and/or adapted for the specific needs of each application.
To utilize this solution, you need to modify the route URI on the consumer end of the pipeline by including the
`headerDeserializer` option.
For example:
[source,java]
.Route snippet
----
from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
.to("...");
----
=== Producer Performance
If the producer is performing too slowly for your needs, you may want to aggregate the exchanges before sending.
[source,java]
.Route snippet
----
from("source")
// .other route stuff
.aggregate(constant(true), new GroupedExchangeAggregationStrategy())
.to("kafka:topic");
----
The reason for this is related to how the producer handles the two different cases:
* with the `aggregrate` it should process the messages in a "batch-sized chunk" semi-asynchronously (that is, send all messages in the batch and then wait for their acknowledgements)
* without that, it sends synchronously, eventually blocking on the record metadata fetch per exchange.
NOTE: the downside of this approach is an increased number of in-flight exchanges and the potential risks (even though small and rare) associated with that.
== Examples
=== Consuming messages from Kafka
Here is the minimal route you need to read messages from Kafka.
[source,java]
----
from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
----
If you need to consume messages from multiple topics, you can use a comma separated list of topic names.
[source,java]
----
from("kafka:test,test1,test2?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
----
It's also possible to subscribe to multiple topics giving a pattern as the topic name and using the `topicIsPattern` option.
[source,java]
----
from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
----
When consuming messages from Kafka, you can use your own offset management and not delegate this management to Kafka.
To keep the offsets, the component needs a `StateRepository` implementation such as `FileStateRepository`.
This bean should be available in the registry.
Here how to use it :
[source,java]
----
// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
fromF("kafka:%s?brokers=localhost:{{kafkaPort}}" +
// Set up the topic and broker address
"&groupId=A" +
// The consumer processor group ID
"&autoOffsetReset=earliest" +
// Ask to start from the beginning if we have unknown offset
"&offsetRepository=#offsetRepo", TOPIC)
// Keep the offsets in the previously configured repository
.to("mock:result");
}
});
----
=== Producing messages to Kafka
Here is the minimal route you need to produce messages to Kafka.
[source,java]
----
from("direct:start")
.setBody(constant("Message from Camel")) // Message to send
.setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
.to("kafka:test?brokers=localhost:9092");
----
=== SSL configuration
You have two different ways to configure the SSL communication on the Kafka component.
The first way is through the many SSL endpoint parameters:
[source,java]
----
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
"&groupId=A" +
"&sslKeystoreLocation=/path/to/keystore.jks" +
"&sslKeystorePassword=changeit" +
"&sslKeyPassword=changeit" +
"&securityProtocol=SSL")
.to("mock:result");
----
The second way is to use the `sslContextParameters` endpoint parameter:
[source,java]
----
// Configure the SSLContextParameters object
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);
// Bind this SSLContextParameters into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("ssl", scp);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
// Set up the topic and broker address
"&groupId=A" +
// The consumer processor group ID
"&sslContextParameters=#ssl" +
// The security protocol
"&securityProtocol=SSL)
// Reference the SSL configuration
.to("mock:result");
}
});
----
include::spring-boot:partial$starter.adoc[]