Publish-Subscribe Channel¶
This page explains how you can implement a sample scenario of Publish-Subscribe Channel EIP using WSO2 Micro Integrator.
Introduction to Publish-Subscribe Channel¶
The Publish-Subscribe Channel EIP receives messages from the input channel, and then splits and transmits them among its subscribers through the output channel. Each subscriber has only one output channel. For more information, go to Publish Subscribe Channel.
Info
For more information, see the Publish subscribe channel documentation.
Sample scenario¶
The example scenario depicts an inventory for stocks and demonstrates how the EIP distributes a sent message among several subscribers. It includes multiple instances of the SimpleStockQuoteService. When a message is added to the WSO2 MI, it is transmitted to these server instances, each of which acts as a subscriber through the ActiveMQ topic.
The diagram below depicts how to simulate the example scenario using WSO2 MI.
Before digging into implementation details, let's take a look at the relationship between the example scenario and the Publish-Subscribe Channel EIP by comparing their core components.
Publish-Subscribe Channel EIP | Publish-Subscribe Channel Example Scenario |
---|---|
Subscriber | SimpleStockQuoteService |
Publisher Subscriber Channel | ActiveMQ topic |
Publisher | StockQuoteProxy |
Synapse configurations of the artifacts¶
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="StockQuoteProxy" transports="http" startOnLoad="true" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<call>
<endpoint>
<address uri="jms:/SimpleStockQuoteService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=topic"/>
</endpoint>
</call>
</inSequence>
</target>
</proxy>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimpleStockQuoteService1" transports="jms" startOnLoad="true" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<log level="custom">
<property name="Subscriber1" value="I am Subscriber1"/>
</log>
<drop/>
</inSequence>
</target>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
<parameter name="transport.jms.DestinationType">topic</parameter>
<parameter name="transport.jms.Destination">SimpleStockQuoteService</parameter>
</proxy>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimpleStockQuoteService2" transports="jms" startOnLoad="true" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<log level="custom">
<property name="Subscriber2" value="I am Subscriber2"/>
</log>
<drop/>
</inSequence>
</target>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
<parameter name="transport.jms.DestinationType">topic</parameter>
<parameter name="transport.jms.Destination">SimpleStockQuoteService</parameter>
</proxy>
How the implementation works¶
Let's break down the key components of the configuration:
- StockQuoteProxy: Forwards stock quote requests from clients to a JMS topic, enabling asynchronous processing.
- SimpleStockQuoteService1: Subscribes to the JMS topic, logs a custom message, and drops the message.
- SimpleStockQuoteService2: Similar to SimpleStockQuoteService1, it subscribes to the JMS topic, logs a different custom message, and drops the message.
Set up the sample scenario¶
Follow the below instructions to simulate this sample scenario.
-
Install WSO2 Micro Integrator.
Info
Follow the Install the Micro Integrator Runtime documentation for more information.
-
Launch Visual Studio Code with the Micro Integrator for VS Code extension (MI for VS Code) installed.
Info
Follow the Install Micro Integrator for VS Code documentation for a complete installation guide.
-
Download the artifacts of the sample.
-
Import the artifacts to WSO2 MI.
Click File -> Open Folder -> Select the extracted ZIP file to import the downloaded ZIP file.
-
Start the project in the WSO2 MI server.
For instructions, go to Build and Run Documentation.
-
Set up and Start ActiveMQ.
Note
Make sure to configure the relevant JMS parameters in the deployment.toml
file.
[[transport.jms.listener]]
name = "myTopicConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
[[transport.jms.sender]]
name = "myTopicSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://localhost:61616"
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
Execute the sample¶
Send the following request to the service using SoapUI (or any other SOAP client).
POST http://localhost:8290/services/StockQuoteProxy
Accept-Encoding: gzip,deflate
Content-Type: text/xml;charset=UTF-8
SOAPAction: "urn:getQuote"
Connection: Keep-Alive
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
<soapenv:Body>
<ser:getQuote>
<ser:request>
<xsd:symbol>IBM</xsd:symbol>
</ser:request>
</ser:getQuote>
</soapenv:Body>
</soapenv:Envelope>
Analyze the output¶
When you execute the command above, the request is sent to the StockQuoteProxy. Notice the following processed server log in WSO2 MI output:
[2024-08-13 09:40:05,276] INFO {LogMediator} - {proxy:SimpleStockQuoteService1} Subscriber1 = I am Subscriber1
[2024-08-13 09:40:05,276] INFO {LogMediator} - {proxy:SimpleStockQuoteService2} Subscriber2 = I am Subscriber2