Skip to content

Kafka Inbound Endpoint Reference

Mandatory parameters for Kafka Inbound Endpoint

The following parameters are required when configuring Kafka Inbound Endpoint.

Parameter Name Description
bootstrap.servers The Kafka brokers listed as host1:port1 and host2:port2
key.deserializer Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
value.deserializer Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
group.id The consumer group ID.
poll.timeout The max time to block in the consumer waiting for records.
topic.name The name of the topic.
topic.pattern The name pattern of the topic.
contentType The content type of the message.

Optional parameters for Kafka Inbound Endpoint

Parameter Name Description Default Value
enable.auto.commit Whether the consumer will automatically commit offsets periodically at the interval set by auto.commit.interval.ms. true
auto.commit.interval.ms Offsets are committed automatically with a frequency controlled by the config. 5000
session.timeout.ms The timeout used to detect client failures when using Kafka’s group management facility. 10000
fetch.min.bytes The minimum amount of data the server should return for a fetch request. 1
heartbeat.interval.ms The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. 3000
max.partition.fetch.bytes The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. 1048576
key.delegate.deserializer Property name for the delegate key deserializer.
value.delegate.deserializer Property name for the delegate value deserializer.
schema.registry.url Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
basic.auth.credentials.source Specify how to pick the credentials for the Basic authentication header.
basic.auth.user.info Specify the user info for the Basic authentication in the form of {username}:{password}.
ssl.key.password The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key`.
ssl.keystore.location The location of the key store file. This is optional for client and can be used for two-way authentication for client.
ssl.keystore.password The store password for the key store file. This is optional for client and only needed if ‘ssl.keystore.location’ is configured.
ssl.truststore.location The location of the trust store file.
ssl.truststore.password The password for the trust store file.
auto.offset.reset Defines what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. latest
connections.max.idle.ms Close idle connections after the number of milliseconds specified by this config. 540000
exclude.internal.topics Whether internal topics matching a subscribed pattern should be excluded from the subscription. true
fetch.max.bytes The maximum amount of data the server should return for a fetch request. 52428800
max.poll.interval.ms The maximum delay between invocations of poll() when using consumer group management. 300000
max.poll.records The maximum number of records returned in a single call to poll(). 500
partition.assignment.strategy A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. org.apache.kafka.clients.consumer.RangeAssignor
receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. 65536
request.timeout.ms The configuration controls the maximum amount of time the client will wait for the response of a request. 305000
sasl.jaas.config JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
sasl.client.callback.handler.class The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.
sasl.login.class The fully qualified name of a class that implements the Login interface.
sasl.kerberos.service.name The Kerberos principal name that Kafka runs as.
sasl.mechanism SASL mechanism used for client connections.
security.protocol Protocol used to communicate with brokers.
send.buffer.bytes The size of the TCP send buffer (SO_SNDBUF) to use when sending data. 131072
ssl.enabled.protocols The list of protocols enabled for SSL connections.
ssl.keystore.type The file format of the key store file.
ssl.protocol The SSL protocol used to generate the SSLContext.
ssl.provider The name of the security provider used for SSL connections.
ssl.truststore.type The file format of the trust store file.
check.crcs Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. true
client.id An id string to pass to the server when making requests.
fetch.max.wait.ms The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. 500
interceptor.classes A list of classes to use as interceptors.
metadata.max.age.ms The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. 300000
metric.reporters A list of classes to use as metrics reporters.
metrics.num.samples The number of samples maintained to compute metrics. 2
metrics.recording.level The highest recording level for metrics. INFO
metrics.sample.window.ms The window of time a metrics sample is computed over. 30000
reconnect.backoff.ms The base amount of time to wait before attempting to reconnect to a given host. 50
retry.backoff.ms The amount of time to wait before attempting to retry a failed request to a given topic partition. 100
sasl.kerberos.kinit.cmd Kerberos kinit command path.
sasl.kerberos.min.time.before.relogin Login thread sleep time between refresh attempts.
sasl.kerberos.ticket.renew.jitter Percentage of random jitter added to the renewal time.
sasl.kerberos.ticket.renew.window.factor Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.
ssl.cipher.suites A list of cipher suites.
ssl.endpoint.identification.algorithm The endpoint identification algorithm to validate server hostname using server certificate.
ssl.keymanager.algorithm The algorithm used by key manager factory for SSL connections.
ssl.secure.random.implementation The SecureRandom PRNG implementation to use for SSL cryptography operations.
ssl.trustmanager.algorithm The algorithm used by trust manager factory for SSL connections.
sasl.oauthbearer.token.endpoint.url The URL for the OAuth/OIDC identity provider.
sasl.oauthbearer.scope.claim.name The OAuth claim for the scope is often named “scope”, but this (optional) setting can provide a different name to use for the scope included in the JWT payload’s claims if the OAuth/OIDC provider uses a different name for that claim.
sasl.login.callback.handler.class The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface.
sasl.login.connect.timeout.ms The (optional) value in milliseconds for the external authentication provider connection timeout.
sasl.login.read.timeout.ms The (optional) value in milliseconds for the external authentication provider read timeout.
sasl.login.retry.backoff.ms The (optional) value in milliseconds for the initial wait between login attempts to the external authentication provider.
sasl.login.retry.backoff.max.ms The (optional) value in milliseconds for the maximum wait between login attempts to the external authentication provider.
kafka.header.prefix The prefix for Kafka headers.
avro.use.logical.type.converters Whether to enable the use of logical type converters in Avro. This parameter is available only with Kafka Inbound Endpoint v1.2.2 and above. False