Kafka Connector Example¶
Given below is a sample scenario that demonstrates how to send messages to a Kafka broker via Kafka topics. The publishMessages operation allows you to publish messages to the Kafka brokers via Kafka topics.
What you'll build¶
Given below is a sample API that illustrates how you can connect to a Kafka broker with the init
operation and then use the publishMessages
operation to publish messages via the topic. It exposes Kafka functionalities as a RESTful service. Users can invoke the API using HTTP/HTTPs with the required information.
API has the context /publishMessages
. It will publish messages via the topic to the Kafka server.
The following diagram illustrates all the required functionality of the Kafka service that you are going to build.
If you do not want to configure this yourself, you can simply get the project and run it.
Set up kafka¶
Before you begin, set up Kafka by following the instructions in Setting up Kafka.
Set up the integration project¶
-
Follow the steps in create integration project guide to set up the Integration Project.
-
Create a new Kafka connection.
- Goto
Local Entries
->Connections
and click on the+
sign. -
Select
KafkaTransport
connector. -
Use the following values to create the connection.
- Connection Name -
KafkaConnection
- Connection Type -
kafka
- Bootstrap Servers -
localhost:9092
- Key Serializer Class -
org.apache.kafka.common.serialization.StringSerializer
- Value Serializer Class -
org.apache.kafka.common.serialization.StringSerializer
- Pooling Enabled -
false
- Connection Name -
- Goto
Create the integration logic¶
-
Select Micro Integrator and click on
+
in APIs to create a REST API. ProvideKafkaTransport
as name andpublishMessages
as context. -
Create a resource with the below configuration.
-
Select the created resource and add the
PublishMessages
operation.-
Use the following values to fill the appearing form.
- Connection -
KafkaConnection
- Topic -
test
- Partition Number -
0
- Connection -
-
The source view of the XML configuration file of the API will be as below.
```xml
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishMessages" name="KafkaTransport" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<kafkaTransport.publishMessages configKey="KafkaConnection">
<topic>test</topic>
<partitionNo>0</partitionNo>
<keySchemaSoftDeleted>false</keySchemaSoftDeleted>
<valueSchemaSoftDeleted>false</valueSchemaSoftDeleted>
</kafkaTransport.publishMessages>
</inSequence>
<faultSequence>
</faultSequence>
</resource>
</api>
```
Now, we can export the imported connector and the API into a single CAR application. The CAR application needs to be deployed during server runtime.
Export integration logic as a carbon application¶
To export the project, please refer to the build and export the carbon application guide.
Get the project¶
You can download the ZIP file and extract the contents to get the project code.
Deployment¶
To deploy and run the project, please refer to the build and run guide.
You can further refer the application deployed through the CLI tool. See the instructions on managing integrations from the CLI.
Test¶
Create a topic:
Let’s create a topic named test
with a single partition and only one replica.
Navigate to the bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Sample request:
Send a message to the Kafka broker using a CURL command or sample client.
curl -X POST -d '{"name":"sample"}' "http://localhost:8290/publishMessages" -H "Content-Type:application/json" -v
Expected response:
Navigate to the bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
See the following message content:
{"name":"sample"}
This demonstrates how the Kafka connector publishes messages to the Kafka brokers.
What's next¶
- To customize this example for your own scenario, see Kafka Connector Configuration documentation.