Apache Pulsar Connector Example¶
The Apache Pulsar Connector allows publishing messages to Apache Pulsar topics hosted on either local brokers or managed Pulsar services. It supports secure communication using TLS encryption and JWT-based authentication, and exposes the publishing functionality through a RESTful API. Users can send messages to Pulsar over HTTP/HTTPS, simplifying integration with external systems.
What you'll build¶
This example demonstrates how to use the Apache Pulsar Connector to publish messages to an Apache Pulsar topic through a REST API. The API exposes the resource /publishmessage
, which enables users to send messages to a Pulsar topic.
The user sends a payload containing the necessary information, such as topic, message content, key, and message properties. The integration runtime then uses the Pulsar Connector to publish the message to the configured Pulsar broker.
/publishMessage
: This resource accepts a request payload that includes details such as the topic name, message content, key, and any message properties. Upon invocation, the API uses the Apache Pulsar Connector to connect with the Pulsar broker and publish the message to the specified topic.
The following diagram illustrates an overview of the Apache Pulsar RESTful service that you are going to build in this example.
If you do not want to configure this yourself, you can simply get the project and run it.
Before you begin¶
Setup Apache Pulsar¶
To connect with Apache Pulsar using the WSO2 Micro Integrator Apache Pulsar Connector, you need to first set up a running Pulsar instance locally or on a server. In this example, we will use an Apache Pulsar standalone server. Set up Apache Pulsar by following the instructions in Set up Apache Pulsar.
Develop the integration logic¶
Follow these steps to set up the Integration Project using the WSO2 Micro Integrator Visual Studio Code extension.
Create a new project¶
Follow the steps in the create integration project guide to set up the WSO2 MI and create the integration project with the Project Name as follows:
Create the integration logic¶
Create the API¶
-
Under the Create an integration section, select API to create a new REST API.
-
Then, enter the API name as
PublishMessage
and the Context as/publishMessage
and click Create. -
Select the newly created
PublishMessage
API and click the Edit button to change the API method. -
Then select the
POST
method and click Update.
This will create a new API resource with the context/publishmessage
and the methodPOST
.
Create Sample Request Payload¶
To map elements from the request payload to the configuration parameters, you can define a sample request payload. To do this, follow the steps below:
Click on the resource and you will be redirected to the Design View of the API. Now, click on the Start
node on the canvas and select the Add Request
option. This will open a pane to create a new example payload request.
In this operation, we are going to receive the following inputs from the user.
- topic - The name of the Pulsar topic to which the message will be published.
- message - The content of the message to be published.
- key - The key associated with the message, used for partition routing.
- properties - Additional named properties to be included with the message.
Therefore, provide the following JSON payload to the request.
{
"topic": "cities",
"message": "Hello World!",
"key": "my-key",
"properties": {
"message-type":"text/plain",
"event-date": "2025-05-20"
}
}

Add Pulsar Connector to the Project¶
-
Now we will add the
publishMessage
operation of the Pulsar Connector to the integration flow. To do this, we need to add the Apache Pulsar Connector to the integration project first. For that, open the Resource View, click on the + icon on the canvas to open the Mediator Palette and search forPulsar
in the Mediators section. Then, select the Pulsar connector and click on the Download button. -
Click on the
publishMessage
operation to add that operation to the integration flow.
Create Connection¶
-
Create a new connection by clicking on the '+ Add new connection' button as shown below. It will open a new pop-up window.
-
Click on the PulsarSecure tile under the Pulsar Connector.
-
Enter the connection name as
PulsarSecureConnection
and provide the following details in the Pulsar Connection configuration pane.- Broker URL: pulsar+ssl://localhost:6651/
- Authentication Type: JWT
- JWT Token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY
- Broker CA Certificate Path: /absolute/path/to/ca.cert.pem
Implement the API¶
-
Once the Pulsar connection is successfully created, it will be listed in the drop-down on the Connection section of the Add PublishMessage operation window.
-
Now we need to configure the necessary parameters of the
publishMessage
operation. For some of the fields, we will use Synapse expressions to map values from the sample request defined in Create Sample Request Payload.- Input Section
- Message:
${payload.message}
- Key:
${payload.key}
- Message Properties:
[ {"message-type": "${payload.properties.message-type}"}, {"event-date": "${payload.properties.event-date}"} ]
- Message:
- Producer Settings
- Topic Name:
${payload.topic}
- Compression Type: NONE
- Send Mode: Sync
- Batching Enabled: true
- Topic Name:
- Output Section
- Overwrite Message Body: true
- Input Section
-
To do this, click on the
Expression
icon in the Message field and select thePayload
option.This will open a new pop-up window where you can select the
payload.message
parameter from the request payload.Next, do the same for the
Key
,Message Properties
, andTopic
fields.Tick the
Overwrite body
checkbox in theOutput
field to overwrite the message body with the response of the operation. This will replace the sample request payload defined at the top with the output schema of the operation response shown below. -
Add a Log mediator to log the response of the
publishMessage
operation. To do this, click on the+
icon after thepublishMessage
operation and select theLog
mediator from the Mediator Palette. -
Add the Respond Mediator to respond to the response of the
publishMessage
operation as shown below.
Export the integration project as a carbon application¶
To export the project, refer to the build and export the carbon application guide.
Get the project¶
You can download the ZIP file and extract the contents to get the project code.
Deployment¶
To deploy and run the project, refer to the build and run guide.
Test¶
- Create a file called
data.json
with the following payload.{ "topic": "test-topic", "message": "Hello, Pulsar!", "key": "sample-key", "properties": { "message-type": "text/plain", "event-date": "2025-05-20" } }
- Use the following
curl
command to invoke the API.Expected Response: You should get a 'success' response as below along with the messageId.curl -H "Content-Type: application/json" --request POST --data @data.json http://localhost:8290/publishmessage
{ "success": "true", "messageId": "141:0:-1" }
What's next¶
- To customize this example for your own scenario, see Apache Pulsar Connector Reference documentation.