Kafka Connector Reference¶
The following operations allow you to work with the Kafka Connector. Click an operation name to see parameter details and samples on how to use it.
To use the Kafka connector, add the <kafkaTransport.init>
element in your configuration before carrying out any other Kafka operations. This can be with or without security depending on your requirements.
kafkaTransport.init
You can configure the kafkaTransport.init operation to setup your Kafka producer with or without security.
Parameter Name | Description | Required |
---|---|---|
name | Unique name to identify the connection. | Yes |
bootstrapServers | The Kafka brokers listed as host1:port1 and host2:port2. | Yes |
keySerializerClass | The serializer class for the key that implements the serializer interface. | Yes |
valueSerializerClass | The serializer class for the value that implements the serializer interface. | Yes |
schemaRegistryUrl | The URL of the confluent schema registry, only applicable when dealing with apache avro serializer class.. | Optional |
basicAuthCredentialsSource | The source of basic auth credentials (e.g. USER_INFO, URL), when schema registry is secured to use basic auth.. | Optional |
basicAuthUserInfo | The relevant basic auth credentials (should be used with basicAuthCredentialsSource). | Optional |
acks | The number of acknowledgments that the producer requires for the leader to receive before considering a request to be complete. | Optional |
bufferMemory | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. | Optional |
compressionType | The compression type for the data generated by the producer. | Optional |
retries | Set a value greater than zero if you want the client to resent any records automatically when a request fails. | Optional |
sslKeyPassword | The password of the private key in the keystore file. Setting this for the client is optional. | Optional |
sslKeystoreLocation | The location of the key store file. Setting this for the client is optional. Set this when you want to have two-way authentication for the client. | Optional |
sslKeystorePassword | The store password for the keystore file. Setting this for the client is optional. Set it only if ssl.keystore.location is configured. | Optional |
sslTruststoreLocation | The location of the trust store file. | Optional |
sslTruststorePassword | The password for the trust store file. | Optional |
batchSize | Specify how many records the producer should batch together when multiple records are sent to the same partition. | Optional |
clientId | The client identifier that you pass to the server when making requests. | Optional |
connectionsMaxIdleTime | The duration in milliseconds after which idle connections should be closed. | Optional |
lingerTime | The time, in milliseconds, to wait before sending a record. Set this property when you want the client to reduce the number of requests sent when the load is moderate. This adds a small delay rather than immediately sending out a record. Therefore, the producer waits up to allow other records to be sent so that the requests can be batched together. | Optional |
maxBlockTime | The maximum time in milliseconds that the KafkaProducer.send() and the KafkaProducer.partitionsFor() methods can be blocked. | Optional |
maxRequestSize | The maximum size of a request in bytes. | Optional |
partitionerClass | The partitioner class that implements the partitioner interface. | Optional |
receiveBufferBytes | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. | Optional |
requestTimeout | The maximum amount of time, in milliseconds, that a client waits for the server to respond. | Optional |
saslJaasConfig | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. | Optional |
saslKerberosServiceName | The Kerberos principal name that Kafka runs as. | Optional |
saslMechanism | The mechanism used for SASL. | Optional |
securityProtocol | The protocol used to communicate with brokers. | Optional |
sendBufferBytes | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. | Optional |
sslEnabledProtocols | The list of protocols enabled for SSL connections. | Optional |
sslKeystoreType | The format of the keystore file. Setting this for the client is optional. | Optional |
sslProtocol | The SSL protocol used to generate the SSLContext. | Optional |
sslProvider | The name of the security provider used for SSL connections. The default value is the default security provider of the JVM. | Optional |
sslTruststoreType | The format of the trust store file. | Optional |
timeout | The maximum amount of time, in milliseconds, that the server waits for the acknowledgments from followers to meet the acknowledgment requirements that the producer has specified with acks configuration. | Optional |
blockOnBufferFull | Set to true to stop accepting new records when the memory buffer is full. When blocking is not desirable, set this property to false, which causes the producer to throw an exception if a recrord is sent to the memory buffer when it is full. | Optional |
maxInFlightRequestsPerConnection | The maximum number of unacknowledged requests that the client can send via a single connection before blocking. | Optional |
metadataFetchTimeout | The maximum amount of time, in milliseconds, to block and wait for the metadata fetch to succeed before throwing an exception to the client. | Optional |
metadataMaxAge | The period of time, in milliseconds, after which you should refresh metadata even if there was no partition leadership changes to proactively discover any new brokers or partitions. | Optional |
metricReporters | A list of classes to use as metrics reporters. | Optional |
metricsNumSamples | The number of samples maintained to compute metrics. | Optional |
metricsSampleWindow | The window of time, in milliseconds, that a metrics sample is computed over. | Optional |
reconnectBackoff | The amount of time to wait before attempting to reconnect to a given host. | Optional |
retryBackoff | The amount of time, in milliseconds, to wait before attempting to retry a failed request to a given topic partition. | Optional |
saslKerberosKinitCmd | The kerberos kinit command path. | Optional |
saslKerberosMinTimeBeforeRelogin | Login thread's sleep time, in milliseconds, between refresh attempts. | Optional |
saslKerberosTicketRenewJitter | Percentage of random jitter added to the renewal time. | Optional |
saslKerberosTicketRenewWindowFactor | The login thread sleeps until the specified window factor of time from the last refresh to the ticket's expiry is reached, after which it will try to renew the ticket. | Optional |
sslCipherSuites | A list of cipher suites. | Optional |
sslEndpointIdentificationAlgorithm | The endpoint identification algorithm to validate the server hostname using a server certificate. | Optional |
sslKeymanagerAlgorithm | The algorithm used by the key manager factory for SSL connections. The default value is the key manager factory algorithm configured for the Java Virtual Machine. | Optional |
sslSecureRandomImplementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | Optional |
sslTrustmanagerAlgorithm | The algorithm used by the trust manager factory for SSL connections. The default value is the trust manager factory algorithm configured for the Java Virtual Machine. | Optional |
poolingEnabled | Indicates whether or not connection pooling is enabled. Set to 'true' if pooling is enabled and 'false' otherwise. | Optional |
maxActiveConnections | Maximum number of active connections in the pool. | Optional |
maxIdleConnections | Maximum number of idle connections in the pool. | Optional |
maxWaitTime | Maximum number of idle connections in the pool. | Optional |
minEvictionTime | The minimum amount of time an object may remain idle in the pool before it is eligible for eviction. | Optional |
evictionCheckInterval | The number of milliseconds between runs of the object evictor. | Optional |
exhaustedAction | The behavior of the pool when the pool is exhausted (WHEN_EXHAUSTED_FAIL/WHEN_EXHAUSTED_BLOCK/WHEN_EXHAUSTED_GROW). | Optional |
Performance Tuning Tip: For better throughput, configure the parameter as follows in the configuration:
<maxPoolSize>20</maxPoolSize>
If you do not specify the maxPoolSizeparameter in the configuration, a Kafka connection is created for each message request.
Sample configuration
Given below is a sample configuration to create a producer without security.
<kafkaTransport.init>
<name>Sample_Kafka</name>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>
There is an additional feature for security found in Kafka version 0.9.0.0 and above. You can configure it using the element
<kafkaTransport.init>
<name>Sample_Kafka</name>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<securityProtocol>SSL</securityProtocol>
<sslTruststoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.truststore.jks</sslTruststoreLocation>
<sslTruststorePassword>test1234</sslTruststorePassword>
<sslKeystoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.keystore.jks</sslKeystoreLocation>
<sslKeystorePassword>test1234</sslKeystorePassword>
<sslKeyPassword>test1234</sslKeyPassword>
</kafkaTransport.init>
Given below is a sample configuration to create a producer for Kafka Avro Serialization,
<kafkaTransport.init>
<name>Sample_Kafka</name>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
</kafkaTransport.init>
Sample init configuration when confluent schema registry is secured with basic auth,
<kafkaTransport.init>
<name>Sample_Kafka</name>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<basicAuthCredentialsSource>USER_INFO</basicAuthCredentialsSource>
<basicAuthUserInfo>admin:admin</basicAuthUserInfo>
</kafkaTransport.init>
Publishing messages to Kafka¶
publishMessages
The publishMessages operation allows you to publish messages to the Kafka brokers via Kafka topics.
Parameter Name | Description | Required |
---|---|---|
topic | The name of the topic. | Yes |
partitionNo | The partition number of the topic. | Yes |
key | Key of the kafka message. | Optional |
keySchema | Schema of the provided key (applicable only with Kafka Avro Serialization). | Optional |
keySchemaId | Schema id of the key schema that is stored in the confluent schema registry (applicable only with Kafka Avro Serialization). | Optional |
value | The kafka value/message. | Optional |
valueSchema | Schema of the Kafka value (applicable only with Kafka Avro Serialization). | Optional |
valueSchemaId | Schema id of the value schema that is stored in the confluent schema registry (applicable only with Kafka Avro Serialization). | Optional |
Content-Type | The Content-Type of the message. | Optional |
If required, you can add custom headers to the records in publishMessage operation:
<topic.Content-Type>Value</topic.Content-Type>
You can add the parameter as follows in the publishMessage operation:
<kafkaTransport.publishMessage configKey="kafka_init">
<topic>topicName</topic>
<partitionNo>partitionNo</partitionNo>
<topicName.Content-Type>Value</topicName.Content-Type>
</kafkaTransport.publishMessage>
<kafkaTransport.publishMessages>
<topic>topicName</topic>
<key>key of the message</key>
<keySchema>schema of the configured key</keySchema>
<value>value of the message</value>
<valueSchema>schema of the configured value</valueSchema>
</kafkaTransport.publishMessages>
<kafkaTransport.publishMessages>
<topic>topicName</topic>
<key>key of the message</key>
<keySchemaId>schemaId of the configured key</keySchema>
<value>value of the message</value>
<valueSchemaId>schemaId of the configured value</valueSchema>
</kafkaTransport.publishMessages>
Error codes related to Kafka Connector¶
Note
With Kafka connector v3.1.2 and above, when an error occurs, one of the following errors will get set to the message context. For details on how to access these error properties, refer Generic Properties.
Error Code | Detail |
---|---|
700501 | Connection error. |
700502 | Invalid configuration. |
700503 | Error while serializing the Avro message in the producer. |
700504 | Illegal type is used in an Avro message. |
700505 | Error while building Avro schemas. |
700506 | Error while parsing schemas and protocols. |
700507 | Expected contents of a union cannot be resolved. |
700508 | The request message cannot be processed. |
700509 | Any other Kafka related error. |