Apache Pulsar Connector Reference¶
This documentation provides a reference guide for the Apache Pulsar Connector. The Pulsar connector allows you to connect to Apache Pulsar, a distributed messaging system, and perform message publishing. Click an operation name to see parameter details and samples on how to use it.
Connection Configurations¶
The WSO2 Apache Pulsar Connector allows you to establish both secure and non-secure connections to Apache Pulsar.
Connection Configuration Parameters¶
PULSAR
This operation allows you to initialize the connection to Apache Pulsar.ddd
Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.
Parameter Name | Display Name | Description | Required |
---|---|---|---|
name | Connection name | Unique name to identify the connection. | Yes |
serviceUrl | Broker URL | The Pulsar broker URL to connect. It follows the format - pulsar://<host_name>:<port> where pulsar is the protocol. The content following the :// is the host name and the port on which the broker is running. |
Yes |
Additional parameters for connection | |||
connectionTimeoutMs | Connection Timeout (in milliseconds) | Timeout for establishing a TCP connection in milliseconds. | No |
operationTimeoutSeconds | Operation Timeout (in seconds) | Specifies how long the client should wait (in seconds) for sending a message before timing out. | No |
requestTimeoutMs | Request Timeout (in milliseconds) | Timeout duration (in milliseconds) for individual Pulsar requests. | No |
lookupTimeoutMs | Lookup Timeout (in milliseconds) | Timeout (in milliseconds) for topic lookup operations. If the broker does not respond in time, the request fails. This helps prevent the client from waiting indefinitely when trying to resolve a topic to a broker. | No |
connectionMaxIdleSeconds | Connection Max Idle Time (in seconds) | Specifies the maximum time (in seconds) a connection can stay idle before being closed. | No |
numIoThreads | Number of IO Threads | Sets the number of threads dedicated for handling network I/O operations such as reading and writing data to the broker. | No |
numListenerThreads | Number of Listener Threads | The number of threads used to process messages received by consumers. | No |
enableBusyWait | Enable Busy Wait | When enabled, uses busy-waiting for IO threads instead of traditional blocking IO. This may reduce latency but increases CPU usage. | No |
initialBackoffInterval | Initial Backoff Interval (in milliseconds) | Initial backoff interval for reconnection attempts (in milliseconds). | No |
maxBackoffInterval | Max Backoff Interval (in milliseconds) | Maximum backoff interval for reconnection attempts (in milliseconds). | No |
connectionsPerBroker | Number of Connections Per Broker | Determines how many TCP connections the client maintains per broker. Increasing this can help with high-throughput scenarios. | No |
maxLookupRedirects | Maximum Number of Lookup Redirects | The maximum number of times a lookup request can be redirected when resolving a topic to its broker. If the number of redirects exceeds this limit, the lookup operation fails. This helps prevent infinite redirect loops during topic resolution. | No |
maxLookupRequest | Maximum Number of Lookup Requests | Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. | No |
concurrentLookupRequest | Number of Concurrent Lookup Requests | The number of concurrent lookup requests that can be sent on each broker connection. | No |
maxConcurrentLookupRequests | Maximum Number of Concurrent Lookup Requests | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. | No |
maxNumberOfRejectedRequestPerConnection | Maximum Number Of Rejected Request Per Connection | Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. | No |
useTcpNoDelay | Enable TCP No Delay | Enable TCP no delay for network connections. | No |
enableTransaction | Enable Transaction | Whether to enable transaction support in the Pulsar connector. When set to true, the connector can participate in transactional message publishing, allowing atomic operations across multiple topics and partitions. This is useful for scenarios requiring exactly-once semantics or coordinated multi-message operations. If not needed, leave it as false to avoid extra overhead. | No |
keepAliveIntervalSeconds | Keep Alive Interval (in seconds) | Interval (in seconds) to send keep-alive messages to maintain an active connection with the broker. This helps prevent the connection from being closed due to inactivity. If not set, the default value defined by Pulsar will be used. | No |
statsIntervalSeconds | Stats Interval (in seconds) | The interval (in seconds) at which the Pulsar connector collects and reports statistics. This helps monitor client(connector) performance and resource usage. If not set, the default value defined by Pulsar will be used. | No |
memoryLimitBytes | Memory Limit (in bytes) | The maximum amount of memory (in bytes) that the Pulsar client(connector) can use. It helps control the memory usage of the client(connector), preventing it from consuming excessive resources, which is important for stability and performance in resource-constrained environments. If not set, the default value defined by Pulsar will be used. | No |
listenerName | Listener Name | Listener name for lookup. | No |
Sample configuration
Given below is a sample configuration to create a client connection without security.
<pulsar.init>
<name>pulsarConnection</name>
<serviceUrl>pulsar://localhost:6650</serviceUrl>
<connectionTimeoutMs>10000</connectionTimeoutMs>
<operationTimeoutSeconds>30</operationTimeoutSeconds>
<requestTimeoutMs>30000</requestTimeoutMs>
<lookupTimeoutMs>10000</lookupTimeoutMs>
<numIoThreads>4</numIoThreads>
<numListenerThreads>2</numListenerThreads>
<connectionsPerBroker>1</connectionsPerBroker>
<memoryLimitBytes>104857600</memoryLimitBytes>
</pulsar.init>
PULSAR SECURE
This operation allows you to initialize a secure and authenticated connection to Apache Pulsar.
Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.
Parameter Name | Display Name | Description | Required |
---|---|---|---|
name | Connection Name | Unique name to identify the connection by. | Yes |
serviceUrl | Broker URL | The Pulsar broker URL to connect. It follows the format- pulsar+ssl://<host_name>:<port> where pulsar is the protocol and +ssl is an optional parameter that shows that the connection is secured by TLS. The content following the :// is the host name and the port on which the broker is running. |
Yes |
Parameters for secure connection (TLS encryption) | |||
tlsAllowInsecureConnection | Allow Insecure TLS Connection | Allows the client to connect to the broker using TLS even if the server's certificate cannot be verified (e.g., self-signed certificates). Use with caution, as it disables strict certificate validation. When this is true, the value of tlsTrustCertsFilePath and the trust store parameters are ignored. Default value is false. |
No |
useKeyStoreTls | Use KeyStore TLS | Enables the use of a Java KeyStore for TLS configuration instead of PEM files. Default value is false. | No |
tlsTrustCertsFilePath | Broker CA Certificate Path | Specifies the file path to the CA certificate(s) used to verify the broker's TLS certificate. This should point to a PEM-encoded certificate file. This is applicable only when useKeyStoreTls parameter is set to false. |
No |
tlsTrustStorePath | TLS TrustStore Path | The file system path to the Java KeyStore (trust store) containing trusted CA certificates for TLS connections. This is applicable only when useKeyStoreTls parameter is set to true. |
No |
tlsTrustStoreType | TLS TrustStore Type | The type of the trust store (e.g., JKS or PKCS12) used for TLS certificate validation. This is applicable only when useKeyStoreTls parameter is set to true. |
No |
tlsTrustStorePassword | TLS TrustStore Password | Password for the TLS trust store. This is applicable only when useKeyStoreTls parameter is set to true. |
No |
tlsProtocols | TLS Protocols | Specifies the list of enabled TLS protocol versions (e.g., TLSv1.2, TLSv1.3) as a comma-separated value. This controls which protocol versions the client will use when negotiating with the broker. | No |
tlsCiphers | TLS Ciphers | Specifies the list of enabled TLS ciphers as a comma-separated value. This controls which encryption algorithms are allowed during the TLS handshake. | No |
autoCertRefreshSeconds | Auto Certificate Refresh Interval (in seconds) | Interval for automatic certificate refresh (in seconds). | No |
enableTlsHostnameVerification | Enable TLS Hostname Verification | Enables hostname verification for TLS connections. When set to true, the client checks that the broker's TLS certificate matches its hostname, providing additional security against man-in-the-middle attacks. If false, this verification is skipped. | No |
Parameters for Authentication | |||
authenticationType | Authentication Type | Type of authentication (e.g., JWT, TLS, OAUTH2, NONE). | No |
jwtToken | JWT Token | JWT token to be used with JWT authentication type. | No |
Additional parameters for connection | |||
connectionTimeoutMs | Connection Timeout (in milliseconds) | Timeout for establishing a TCP connection in milliseconds. | No |
operationTimeoutSeconds | Operation Timeout (in seconds) | Specifies how long the client should wait (in seconds) for sending a message before timing out. | No |
requestTimeoutMs | Request Timeout (in milliseconds) | Timeout duration (in milliseconds) for individual Pulsar requests. | No |
lookupTimeoutMs | Lookup Timeout (in milliseconds) | Timeout (in milliseconds) for topic lookup operations. If the broker does not respond in time, the request fails. This helps prevent the client from waiting indefinitely when trying to resolve a topic to a broker. | No |
connectionMaxIdleSeconds | Connection Max Idle Time (in seconds) | Specifies the maximum time (in seconds) a connection can stay idle before being closed. | No |
numIoThreads | Number of IO Threads | Sets the number of threads dedicated for handling network I/O operations such as reading and writing data to the broker. | No |
numListenerThreads | Number of Listener Threads | The number of threads used to process messages received by consumers. | No |
enableBusyWait | Enable Busy Wait | When enabled, uses busy-waiting for IO threads instead of traditional blocking IO. This may reduce latency but increases CPU usage. | No |
initialBackoffInterval | Initial Backoff Interval (in milliseconds) | Initial backoff interval for reconnection attempts (in milliseconds). | No |
maxBackoffInterval | Max Backoff Interval (in milliseconds) | Maximum backoff interval for reconnection attempts (in milliseconds). | No |
connectionsPerBroker | Number of Connections Per Broker | Determines how many TCP connections the client maintains per broker. Increasing this can help with high-throughput scenarios. | No |
maxLookupRedirects | Maximum Number of Lookup Redirects | The maximum number of times a lookup request can be redirected when resolving a topic to its broker. If the number of redirects exceeds this limit, the lookup operation fails. This helps prevent infinite redirect loops during topic resolution. | No |
maxLookupRequest | Maximum Number of Lookup Requests | Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. | No |
concurrentLookupRequest | Number of Concurrent Lookup Requests | The number of concurrent lookup requests that can be sent on each broker connection. | No |
maxConcurrentLookupRequests | Maximum Number of Concurrent Lookup Requests | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. | No |
maxNumberOfRejectedRequestPerConnection | Maximum Number Of Rejected Request Per Connection | Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. | No |
useTcpNoDelay | Enable TCP No Delay | Enable TCP no delay for network connections. | No |
enableTransaction | Enable Transaction | Whether to enable transaction support in the Pulsar connector. When set to true, the connector can participate in transactional message publishing, allowing atomic operations across multiple topics and partitions. This is useful for scenarios requiring exactly-once semantics or coordinated multi-message operations. If not needed, leave it as false to avoid extra overhead. | No |
keepAliveIntervalSeconds | Keep Alive Interval (in seconds) | Interval (in seconds) to send keep-alive messages to maintain an active connection with the broker. This helps prevent the connection from being closed due to inactivity. If not set, the default value defined by Pulsar will be used. | No |
statsIntervalSeconds | Stats Interval (in seconds) | The interval (in seconds) at which the Pulsar connector collects and reports statistics. This helps monitor client(connector) performance and resource usage. If not set, the default value defined by Pulsar will be used. | No |
memoryLimitBytes | Memory Limit (in bytes) | The maximum amount of memory (in bytes) that the Pulsar client(connector) can use. It helps control the memory usage of the client(connector), preventing it from consuming excessive resources, which is important for stability and performance in resource-constrained environments. If not set, the default value defined by Pulsar will be used. | No |
listenerName | Listener Name | Listener name for lookup. | No |
Sample configuration
Given below is a sample configuration to create a client connection without security.
<pulsar.init>
<name>pulsarConnection</name>
<serviceUrl>pulsar://localhost:6650</serviceUrl>
<connectionTimeoutMs>10000</connectionTimeoutMs>
<operationTimeoutSeconds>30</operationTimeoutSeconds>
<requestTimeoutMs>30000</requestTimeoutMs>
<lookupTimeoutMs>10000</lookupTimeoutMs>
<numIoThreads>4</numIoThreads>
<numListenerThreads>2</numListenerThreads>
<connectionsPerBroker>1</connectionsPerBroker>
<memoryLimitBytes>104857600</memoryLimitBytes>
<authenticationType>JWT</authenticationType>
<jwtToken>eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.KpRMeZtp9Tjl-4wJq5PppK71sc6Gwb0utspd1PpLSr0</jwtToken>
<tlsAllowInsecureConnection>false</tlsAllowInsecureConnection>
<useKeyStoreTls>false</useKeyStoreTls>
<tlsTrustCertsFilePath>/Users/wso2/Documents/apache-pulsar-4.0.4/ca.cert.pem</tlsTrustCertsFilePath>
</pulsar.init>
Publishing messages to Apache Pulsar¶
Publish Message
The publishMessage operation allows you to publish messages to the Apache Pulsar brokers.
Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.
Parameter Name | Display Name | Description | Required |
---|---|---|---|
value | Message | The value or payload of the message to be published. | Yes |
key | Key | The key associated with the message for partitioning or routing. | No |
properties | Message Properties | Custom properties to attach to the message as key-value pairs. | No |
sequenceId | Sequence ID | The sequence ID to assign to the message for deduplication or ordering. | No |
deliverAfter | Deliver After (in milliseconds) | The delay duration after which the message should be delivered (e.g., 5s, 1m). | No |
Producer Settings | |||
topicName | Topic Name | The name of the Pulsar topic to which messages will be published. | Yes |
compressionType | Compression Type | The compression type to use for messages. Supported values: NONE, LZ4, ZLIB, ZSTD, SNAPPY. Reduces message size over the network. | No |
sendMode | Send Mode | The mode for sending the message: SYNC (wait for ack) or ASYNC (send and continue). | No |
batchingEnabled | Batching Enabled | Whether message batching is enabled for the producer. Batching can improve throughput by sending multiple messages in a single request. The default value is true . |
No |
batchingMaxMessages | Batching Max Messages | The maximum number of messages permitted in a batch. The default value is 1000 . |
No |
batchingMaxBytes | Batching Max Bytes (in bytes) | The maximum size of a batch in bytes. The default value is 131072 . |
No |
batchingMaxPublishDelayMicros | Batching Max Publish Delay (in microseconds) | The maximum delay (in microseconds) to wait before publishing a batch, even if the batch is not full. Controls batching latency. The default value is 1000 microseconds. |
No |
chunkingEnabled | Chunking Enabled | Whether chunking is enabled for large messages. If enabled, large messages are split into smaller chunks. The default value is false . |
No |
chunkMaxMessageSize | Chunk Max Message Size (in Bytes) | The maximum size (in bytes) of a single message before it gets chunked. | No |
sendTimeoutMs | Send Timeout (in milliseconds) | The timeout in milliseconds for a message to be sent. If the message is not acknowledged within this time, it is marked as failed. | No |
blockIfQueueFull | Block Producer If Queue Full | Whether the producer should block when the outgoing message queue is full. If false, send operations will fail immediately when the queue is full. The default value is false . |
No |
maxPendingMessages | Max Pending Messages | The maximum number of messages allowed to be pending in the outgoing queue. | No |
maxPendingMessagesAcrossPartitions | Max Pending Messages Across Partitions | The maximum number of pending messages across all partitions. This is useful for partitioned topics. | No |
hashingScheme | Hashing Scheme | The hashing scheme used to determine the partition for a message. Supported values: JavaStringHash, Murmur3_32Hash, BoostHash. The default value is JavaStringHash . |
No |
messageRoutingMode | Message Routing Mode | The message routing mode for partitioned topics. Supported values: SinglePartition, RoundRobinPartition, CustomPartition. The default value is RoundRobinPartition . |
No |
Output Parameters | |||
responseVariable | Output variable Name | The name of the variable to which the output should be stored. | No |
overwriteBody | Overwrite Message Body | Replace the Message Body in Message Context with the response of the operation. | No |
Sample configuration
<pulsar.publishMessages configKey="securePulsar">
<topicName>{${payload.topic}}</topicName>
<compressionType>NONE</compressionType>
<sendMode>Sync</sendMode>
<batchingEnabled>false</batchingEnabled>
<key>{${payload.key}}</key>
<value>{${payload.message}}</value>
<sequenceId></sequenceId>
<properties>[{"compression":"enabled"}, {"type":"json"}]</properties>
<responseVariable>pulsar_publishMessages_135</responseVariable>
<overwriteBody>false</overwriteBody>
</pulsar.publishMessages>
Following properties will be set in the message context or to a new variable if the overwriteBody
parameter is set to false
.
{
"success": "true"
"msgid: "162:3:-1"
}