Skip to content

Apache Pulsar Connector Reference

This documentation provides a reference guide for the Apache Pulsar Connector. The Pulsar connector allows you to connect to Apache Pulsar, a distributed messaging system, and perform message publishing. Click an operation name to see parameter details and samples on how to use it.

Connection Configurations

Apache Pulsar Connections

The WSO2 Apache Pulsar Connector allows you to establish both secure and non-secure connections to Apache Pulsar.


Connection Configuration Parameters

PULSAR

This operation allows you to initialize the connection to Apache Pulsar.ddd

Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.

Parameter Name Display Name Description Required
name Connection name Unique name to identify the connection. Yes
serviceUrl Broker URL The Pulsar broker URL to connect. It follows the format - pulsar://<host_name>:<port> where pulsar is the protocol. The content following the :// is the host name and the port on which the broker is running. Yes
Additional parameters for connection
connectionTimeoutMs Connection Timeout (in milliseconds) Timeout for establishing a TCP connection in milliseconds. No
operationTimeoutSeconds Operation Timeout (in seconds) Specifies how long the client should wait (in seconds) for sending a message before timing out. No
requestTimeoutMs Request Timeout (in milliseconds) Timeout duration (in milliseconds) for individual Pulsar requests. No
lookupTimeoutMs Lookup Timeout (in milliseconds) Timeout (in milliseconds) for topic lookup operations. If the broker does not respond in time, the request fails. This helps prevent the client from waiting indefinitely when trying to resolve a topic to a broker. No
connectionMaxIdleSeconds Connection Max Idle Time (in seconds) Specifies the maximum time (in seconds) a connection can stay idle before being closed. No
numIoThreads Number of IO Threads Sets the number of threads dedicated for handling network I/O operations such as reading and writing data to the broker. No
numListenerThreads Number of Listener Threads The number of threads used to process messages received by consumers. No
enableBusyWait Enable Busy Wait When enabled, uses busy-waiting for IO threads instead of traditional blocking IO. This may reduce latency but increases CPU usage. No
initialBackoffInterval Initial Backoff Interval (in milliseconds) Initial backoff interval for reconnection attempts (in milliseconds). No
maxBackoffInterval Max Backoff Interval (in milliseconds) Maximum backoff interval for reconnection attempts (in milliseconds). No
connectionsPerBroker Number of Connections Per Broker Determines how many TCP connections the client maintains per broker. Increasing this can help with high-throughput scenarios. No
maxLookupRedirects Maximum Number of Lookup Redirects The maximum number of times a lookup request can be redirected when resolving a topic to its broker. If the number of redirects exceeds this limit, the lookup operation fails. This helps prevent infinite redirect loops during topic resolution. No
maxLookupRequest Maximum Number of Lookup Requests Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. No
concurrentLookupRequest Number of Concurrent Lookup Requests The number of concurrent lookup requests that can be sent on each broker connection. No
maxConcurrentLookupRequests Maximum Number of Concurrent Lookup Requests The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. No
maxNumberOfRejectedRequestPerConnection Maximum Number Of Rejected Request Per Connection Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. No
useTcpNoDelay Enable TCP No Delay Enable TCP no delay for network connections. No
enableTransaction Enable Transaction Whether to enable transaction support in the Pulsar connector. When set to true, the connector can participate in transactional message publishing, allowing atomic operations across multiple topics and partitions. This is useful for scenarios requiring exactly-once semantics or coordinated multi-message operations. If not needed, leave it as false to avoid extra overhead. No
keepAliveIntervalSeconds Keep Alive Interval (in seconds) Interval (in seconds) to send keep-alive messages to maintain an active connection with the broker. This helps prevent the connection from being closed due to inactivity. If not set, the default value defined by Pulsar will be used. No
statsIntervalSeconds Stats Interval (in seconds) The interval (in seconds) at which the Pulsar connector collects and reports statistics. This helps monitor client(connector) performance and resource usage. If not set, the default value defined by Pulsar will be used. No
memoryLimitBytes Memory Limit (in bytes) The maximum amount of memory (in bytes) that the Pulsar client(connector) can use. It helps control the memory usage of the client(connector), preventing it from consuming excessive resources, which is important for stability and performance in resource-constrained environments. If not set, the default value defined by Pulsar will be used. No
listenerName Listener Name Listener name for lookup. No

Sample configuration

Given below is a sample configuration to create a client connection without security.

<pulsar.init>
    <name>pulsarConnection</name>
    <serviceUrl>pulsar://localhost:6650</serviceUrl>
    <connectionTimeoutMs>10000</connectionTimeoutMs>
    <operationTimeoutSeconds>30</operationTimeoutSeconds>
    <requestTimeoutMs>30000</requestTimeoutMs>
    <lookupTimeoutMs>10000</lookupTimeoutMs>
    <numIoThreads>4</numIoThreads>
    <numListenerThreads>2</numListenerThreads>
    <connectionsPerBroker>1</connectionsPerBroker>
    <memoryLimitBytes>104857600</memoryLimitBytes>
</pulsar.init>
PULSAR SECURE

This operation allows you to initialize a secure and authenticated connection to Apache Pulsar.

Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.


Parameter Name Display Name Description Required
name Connection Name Unique name to identify the connection by. Yes
serviceUrl Broker URL The Pulsar broker URL to connect. It follows the format- pulsar+ssl://<host_name>:<port> where pulsar is the protocol and +ssl is an optional parameter that shows that the connection is secured by TLS. The content following the :// is the host name and the port on which the broker is running. Yes
Parameters for secure connection (TLS encryption)
tlsAllowInsecureConnection Allow Insecure TLS Connection Allows the client to connect to the broker using TLS even if the server's certificate cannot be verified (e.g., self-signed certificates). Use with caution, as it disables strict certificate validation. When this is true, the value of tlsTrustCertsFilePath and the trust store parameters are ignored. Default value is false. No
useKeyStoreTls Use KeyStore TLS Enables the use of a Java KeyStore for TLS configuration instead of PEM files. Default value is false. No
tlsTrustCertsFilePath Broker CA Certificate Path Specifies the file path to the CA certificate(s) used to verify the broker's TLS certificate. This should point to a PEM-encoded certificate file. This is applicable only when useKeyStoreTls parameter is set to false. No
tlsTrustStorePath TLS TrustStore Path The file system path to the Java KeyStore (trust store) containing trusted CA certificates for TLS connections. This is applicable only when useKeyStoreTls parameter is set to true. No
tlsTrustStoreType TLS TrustStore Type The type of the trust store (e.g., JKS or PKCS12) used for TLS certificate validation. This is applicable only when useKeyStoreTls parameter is set to true. No
tlsTrustStorePassword TLS TrustStore Password Password for the TLS trust store. This is applicable only when useKeyStoreTls parameter is set to true. No
tlsProtocols TLS Protocols Specifies the list of enabled TLS protocol versions (e.g., TLSv1.2, TLSv1.3) as a comma-separated value. This controls which protocol versions the client will use when negotiating with the broker. No
tlsCiphers TLS Ciphers Specifies the list of enabled TLS ciphers as a comma-separated value. This controls which encryption algorithms are allowed during the TLS handshake. No
autoCertRefreshSeconds Auto Certificate Refresh Interval (in seconds) Interval for automatic certificate refresh (in seconds). No
enableTlsHostnameVerification Enable TLS Hostname Verification Enables hostname verification for TLS connections. When set to true, the client checks that the broker's TLS certificate matches its hostname, providing additional security against man-in-the-middle attacks. If false, this verification is skipped. No
Parameters for Authentication
authenticationType Authentication Type Type of authentication (e.g., JWT, TLS, OAUTH2, NONE). No
jwtToken JWT Token JWT token to be used with JWT authentication type. No
Additional parameters for connection
connectionTimeoutMs Connection Timeout (in milliseconds) Timeout for establishing a TCP connection in milliseconds. No
operationTimeoutSeconds Operation Timeout (in seconds) Specifies how long the client should wait (in seconds) for sending a message before timing out. No
requestTimeoutMs Request Timeout (in milliseconds) Timeout duration (in milliseconds) for individual Pulsar requests. No
lookupTimeoutMs Lookup Timeout (in milliseconds) Timeout (in milliseconds) for topic lookup operations. If the broker does not respond in time, the request fails. This helps prevent the client from waiting indefinitely when trying to resolve a topic to a broker. No
connectionMaxIdleSeconds Connection Max Idle Time (in seconds) Specifies the maximum time (in seconds) a connection can stay idle before being closed. No
numIoThreads Number of IO Threads Sets the number of threads dedicated for handling network I/O operations such as reading and writing data to the broker. No
numListenerThreads Number of Listener Threads The number of threads used to process messages received by consumers. No
enableBusyWait Enable Busy Wait When enabled, uses busy-waiting for IO threads instead of traditional blocking IO. This may reduce latency but increases CPU usage. No
initialBackoffInterval Initial Backoff Interval (in milliseconds) Initial backoff interval for reconnection attempts (in milliseconds). No
maxBackoffInterval Max Backoff Interval (in milliseconds) Maximum backoff interval for reconnection attempts (in milliseconds). No
connectionsPerBroker Number of Connections Per Broker Determines how many TCP connections the client maintains per broker. Increasing this can help with high-throughput scenarios. No
maxLookupRedirects Maximum Number of Lookup Redirects The maximum number of times a lookup request can be redirected when resolving a topic to its broker. If the number of redirects exceeds this limit, the lookup operation fails. This helps prevent infinite redirect loops during topic resolution. No
maxLookupRequest Maximum Number of Lookup Requests Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. No
concurrentLookupRequest Number of Concurrent Lookup Requests The number of concurrent lookup requests that can be sent on each broker connection. No
maxConcurrentLookupRequests Maximum Number of Concurrent Lookup Requests The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. No
maxNumberOfRejectedRequestPerConnection Maximum Number Of Rejected Request Per Connection Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. No
useTcpNoDelay Enable TCP No Delay Enable TCP no delay for network connections. No
enableTransaction Enable Transaction Whether to enable transaction support in the Pulsar connector. When set to true, the connector can participate in transactional message publishing, allowing atomic operations across multiple topics and partitions. This is useful for scenarios requiring exactly-once semantics or coordinated multi-message operations. If not needed, leave it as false to avoid extra overhead. No
keepAliveIntervalSeconds Keep Alive Interval (in seconds) Interval (in seconds) to send keep-alive messages to maintain an active connection with the broker. This helps prevent the connection from being closed due to inactivity. If not set, the default value defined by Pulsar will be used. No
statsIntervalSeconds Stats Interval (in seconds) The interval (in seconds) at which the Pulsar connector collects and reports statistics. This helps monitor client(connector) performance and resource usage. If not set, the default value defined by Pulsar will be used. No
memoryLimitBytes Memory Limit (in bytes) The maximum amount of memory (in bytes) that the Pulsar client(connector) can use. It helps control the memory usage of the client(connector), preventing it from consuming excessive resources, which is important for stability and performance in resource-constrained environments. If not set, the default value defined by Pulsar will be used. No
listenerName Listener Name Listener name for lookup. No

Sample configuration

Given below is a sample configuration to create a client connection without security.

<pulsar.init>
    <name>pulsarConnection</name>
    <serviceUrl>pulsar://localhost:6650</serviceUrl>
    <connectionTimeoutMs>10000</connectionTimeoutMs>
    <operationTimeoutSeconds>30</operationTimeoutSeconds>
    <requestTimeoutMs>30000</requestTimeoutMs>
    <lookupTimeoutMs>10000</lookupTimeoutMs>
    <numIoThreads>4</numIoThreads>
    <numListenerThreads>2</numListenerThreads>
    <connectionsPerBroker>1</connectionsPerBroker>
    <memoryLimitBytes>104857600</memoryLimitBytes>
    <authenticationType>JWT</authenticationType>
    <jwtToken>eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.KpRMeZtp9Tjl-4wJq5PppK71sc6Gwb0utspd1PpLSr0</jwtToken>
    <tlsAllowInsecureConnection>false</tlsAllowInsecureConnection>
    <useKeyStoreTls>false</useKeyStoreTls>
    <tlsTrustCertsFilePath>/Users/wso2/Documents/apache-pulsar-4.0.4/ca.cert.pem</tlsTrustCertsFilePath>
</pulsar.init>

Publishing messages to Apache Pulsar

Publish Message

The publishMessage operation allows you to publish messages to the Apache Pulsar brokers.

Note: Unless explicitly stated, the default values of the optional parameters match those defined in the official Apache Pulsar Configuration documentation.

Parameter Name Display Name Description Required
value Message The value or payload of the message to be published. Yes
key Key The key associated with the message for partitioning or routing. No
properties Message Properties Custom properties to attach to the message as key-value pairs. No
sequenceId Sequence ID The sequence ID to assign to the message for deduplication or ordering. No
deliverAfter Deliver After (in milliseconds) The delay duration after which the message should be delivered (e.g., 5s, 1m). No
Producer Settings
topicName Topic Name The name of the Pulsar topic to which messages will be published. Yes
compressionType Compression Type The compression type to use for messages. Supported values: NONE, LZ4, ZLIB, ZSTD, SNAPPY. Reduces message size over the network. No
sendMode Send Mode The mode for sending the message: SYNC (wait for ack) or ASYNC (send and continue). No
batchingEnabled Batching Enabled Whether message batching is enabled for the producer. Batching can improve throughput by sending multiple messages in a single request. The default value is true. No
batchingMaxMessages Batching Max Messages The maximum number of messages permitted in a batch. The default value is 1000. No
batchingMaxBytes Batching Max Bytes (in bytes) The maximum size of a batch in bytes. The default value is 131072. No
batchingMaxPublishDelayMicros Batching Max Publish Delay (in microseconds) The maximum delay (in microseconds) to wait before publishing a batch, even if the batch is not full. Controls batching latency. The default value is 1000 microseconds. No
chunkingEnabled Chunking Enabled Whether chunking is enabled for large messages. If enabled, large messages are split into smaller chunks. The default value is false. No
chunkMaxMessageSize Chunk Max Message Size (in Bytes) The maximum size (in bytes) of a single message before it gets chunked. No
sendTimeoutMs Send Timeout (in milliseconds) The timeout in milliseconds for a message to be sent. If the message is not acknowledged within this time, it is marked as failed. No
blockIfQueueFull Block Producer If Queue Full Whether the producer should block when the outgoing message queue is full. If false, send operations will fail immediately when the queue is full. The default value is false. No
maxPendingMessages Max Pending Messages The maximum number of messages allowed to be pending in the outgoing queue. No
maxPendingMessagesAcrossPartitions Max Pending Messages Across Partitions The maximum number of pending messages across all partitions. This is useful for partitioned topics. No
hashingScheme Hashing Scheme The hashing scheme used to determine the partition for a message. Supported values: JavaStringHash, Murmur3_32Hash, BoostHash. The default value is JavaStringHash. No
messageRoutingMode Message Routing Mode The message routing mode for partitioned topics. Supported values: SinglePartition, RoundRobinPartition, CustomPartition. The default value is RoundRobinPartition. No
Output Parameters
responseVariable Output variable Name The name of the variable to which the output should be stored. No
overwriteBody Overwrite Message Body Replace the Message Body in Message Context with the response of the operation. No

Sample configuration

<pulsar.publishMessages configKey="securePulsar">
    <topicName>{${payload.topic}}</topicName>
    <compressionType>NONE</compressionType>
    <sendMode>Sync</sendMode>
    <batchingEnabled>false</batchingEnabled>
    <key>{${payload.key}}</key>
    <value>{${payload.message}}</value>
    <sequenceId></sequenceId>
    <properties>[{"compression":"enabled"}, {"type":"json"}]</properties>
    <responseVariable>pulsar_publishMessages_135</responseVariable>
    <overwriteBody>false</overwriteBody>
</pulsar.publishMessages>

Following properties will be set in the message context or to a new variable if the overwriteBody parameter is set to false.

{
    "success": "true"
    "msgid: "162:3:-1"
}