Skip to content

Apache Pulsar Connector Example

The Apache Pulsar Connector allows publishing messages to Apache Pulsar topics hosted on either local brokers or managed Pulsar services. It supports secure communication using TLS encryption and JWT-based authentication, and exposes the publishing functionality through a RESTful API. Users can send messages to Pulsar over HTTP/HTTPS, simplifying integration with external systems.

What you'll build

This example demonstrates how to use the Apache Pulsar Connector to publish messages to an Apache Pulsar topic through a REST API. The API exposes the resource /publishmessage, which enables users to send messages to a Pulsar topic.

The user sends a payload containing the necessary information, such as topic, message content, key, and message properties. The integration runtime then uses the Pulsar Connector to publish the message to the configured Pulsar broker.

  • /publishMessage: This resource accepts a request payload that includes details such as the topic name, message content, key, and any message properties. Upon invocation, the API uses the Apache Pulsar Connector to connect with the Pulsar broker and publish the message to the specified topic.

The following diagram illustrates an overview of the Apache Pulsar RESTful service that you are going to build in this example.

PulsarConnectorUseCase

If you do not want to configure this yourself, you can simply get the project and run it.

Before you begin

Setup Apache Pulsar

To connect with Apache Pulsar using the WSO2 Micro Integrator Apache Pulsar Connector, you need to first set up a running Pulsar instance locally or on a server. In this example, we will use an Apache Pulsar standalone server. Set up Apache Pulsar by following the instructions in Set up Apache Pulsar.

Develop the integration logic

Follow these steps to set up the Integration Project using the WSO2 Micro Integrator Visual Studio Code extension.

Create a new project

Follow the steps in the create integration project guide to set up the WSO2 MI and create the integration project with the Project Name as follows:

Creating a new Project

Create the integration logic

Create the API

  1. Under the Create an integration section, select API to create a new REST API.

    Creating a new API

  2. Then, enter the API name as PublishMessage and the Context as /publishMessage and click Create.

    Creating a new API

  3. Select the newly created PublishMessage API and click the Edit button to change the API method.

    Adding the API resource step 1.

  4. Then select the POST method and click Update.

    Adding the API resource step 1.
    This will create a new API resource with the context /publishmessage and the method POST.

Create Sample Request Payload

To map elements from the request payload to the configuration parameters, you can define a sample request payload. To do this, follow the steps below:

Click on the resource and you will be redirected to the Design View of the API. Now, click on the Start node on the canvas and select the Add Request option. This will open a pane to create a new example payload request.

In this operation, we are going to receive the following inputs from the user.

  • topic - The name of the Pulsar topic to which the message will be published.
  • message - The content of the message to be published.
  • key - The key associated with the message, used for partition routing.
  • properties - Additional named properties to be included with the message.

Therefore, provide the following JSON payload to the request.

{
    "topic": "cities",
    "message": "Hello World!",
    "key": "my-key",
    "properties": {
        "message-type":"text/plain",
        "event-date": "2025-05-20"
      }
}
Adding the API request.

Add Pulsar Connector to the Project

  1. Now we will add the publishMessage operation of the Pulsar Connector to the integration flow. To do this, we need to add the Apache Pulsar Connector to the integration project first. For that, open the Resource View, click on the + icon on the canvas to open the Mediator Palette and search for Pulsar in the Mediators section. Then, select the Pulsar connector and click on the Download button.

    Adding Pulsar Connector

  2. Click on the publishMessage operation to add that operation to the integration flow.

Create Connection

  1. Create a new connection by clicking on the '+ Add new connection' button as shown below. It will open a new pop-up window.

    Creating a new Connection

  2. Click on the PulsarSecure tile under the Pulsar Connector.

    Creating a new Connection

  3. Enter the connection name as PulsarSecureConnection and provide the following details in the Pulsar Connection configuration pane.

    • Broker URL: pulsar+ssl://localhost:6651/
    • Authentication Type: JWT
    • JWT Token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY
    • Broker CA Certificate Path: /absolute/path/to/ca.cert.pem

    Creating a new Connection

Implement the API

  1. Once the Pulsar connection is successfully created, it will be listed in the drop-down on the Connection section of the Add PublishMessage operation window.

  2. Now we need to configure the necessary parameters of the publishMessage operation. For some of the fields, we will use Synapse expressions to map values from the sample request defined in Create Sample Request Payload.

    • Input Section
      • Message: ${payload.message}
      • Key: ${payload.key}
      • Message Properties:
        [
          {"message-type": "${payload.properties.message-type}"},
          {"event-date": "${payload.properties.event-date}"}
        ]
        
    • Producer Settings
      • Topic Name: ${payload.topic}
      • Compression Type: NONE
      • Send Mode: Sync
      • Batching Enabled: true
    • Output Section
      • Overwrite Message Body: true
  3. To do this, click on the Expression icon in the Message field and select the Payload option.

    Configuring publishMessage operation.

    Configuring publishMessage operation.

    This will open a new pop-up window where you can select the payload.message parameter from the request payload.

    Configuring publishMessage operation.

    Next, do the same for the Key, Message Properties, and Topic fields.

    PublishMessage operation pane.

    Tick the Overwrite body checkbox in the Output field to overwrite the message body with the response of the operation. This will replace the sample request payload defined at the top with the output schema of the operation response shown below.

    publishmessage operation output schema.

  4. Add a Log mediator to log the response of the publishMessage operation. To do this, click on the + icon after the publishMessage operation and select the Log mediator from the Mediator Palette.

    Adding Log Mediator

    Adding Log Mediator

  5. Add the Respond Mediator to respond to the response of the publishMessage operation as shown below.

    Adding the respond mediator.

Export the integration project as a carbon application

To export the project, refer to the build and export the carbon application guide.

Get the project

You can download the ZIP file and extract the contents to get the project code.

Download ZIP

Deployment

To deploy and run the project, refer to the build and run guide.

Test

  1. Create a file called data.json with the following payload.
    {
        "topic": "test-topic",
        "message": "Hello, Pulsar!",
        "key": "sample-key",
        "properties": {
            "message-type": "text/plain",
            "event-date": "2025-05-20"
        }
    }
    
  2. Use the following curl command to invoke the API.
    curl -H "Content-Type: application/json" --request POST --data @data.json http://localhost:8290/publishmessage
    
    Expected Response: You should get a 'success' response as below along with the messageId.
    {
        "success": "true",
        "messageId": "141:0:-1"
    }
    

What's next