DB Event Inbound Endpoint Example¶
The following are the main features of the event generator.
- Trigger an event with the data of the table row when a new record is added or updated. Optionally, delete the row associated with the event after triggering the event.
- Trigger an event when a boolean field is flipped in a particular table row.
What you'll build¶
In this example let us see how to configure DB-event Inbound Endpoint
so that it can listen to data changes done to a MySQL
table. Out of the features mentioned above, feature no:1 is used here. Please refer to reference guide if you need to use other features.
In an enterprise system, a relational database table is used to store customer information. Customers' information is added by an external system to the database which is not in the enterprise's control. As soon as a new customer is inserted, the system needs to pick up and process its data. The integration runtime is used here to listen to DB changes and invoke the relevant processes. It can invoke backend APIs or place data into a message bus after required data transformations. However, for simplicity of this example, we will just log the message. You can extend the sample as required using WSO2 mediators.
The following diagram shows the overall solution we are going to build. The external system will update the MySQL DB and the integration runtime will trigger events based on the inserts and updates.
If you do not want to configure this yourself, you can simply get the project and run it.
Set up the environment¶
First, install MySQL database locally. If you have a remote server, please obtain the credentials required to connect. In this example, database credentials are assumed as username=root
and password=root
.
-
Create a database called
test
. Then create a table calledCDC_CUSTOM
under that database using the following SQL script.CREATE TABLE `test`.`CDC_CUSTOM` ( `ID` INT NOT NULL, `NAME` VARCHAR(45) NULL, `ADDRESS` VARCHAR(45) NULL, `AMOUNT` INT NULL, PRIMARY KEY (`ID`));
-
We need an additional column to track new records. If you apply this feature to an existing database table, you can alter the table as shown below. It will add a column of type
TIMESTAMP
, which gets automatically updated when you insert or update a record.ALTER TABLE test.CDC_CUSTOM ADD COLUMN UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
Configure inbound endpoint using micro integrator¶
-
Create an Integration Project as below.
-
Create two sequences to process the database event and handle the failure of the database event.
Sequence to process the database event<?xml version="1.0" encoding="UTF-8"?> <sequence name="DBEventProcessSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse"> <log description="event log" level="full"> <property name="message" value="event received"/> </log> </sequence>
Sequence to handle the failure of the database event<?xml version="1.0" encoding="UTF-8"?> <sequence name="eventProcessFailSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse"> <property name="SET_DB_ROLLBACK_ONLY" scope="default" type="STRING" value="true"/> <log level="full"> <property name="MESSAGE" value="Executing default 'fault' sequence"/> <property expression="get-property('ERROR_CODE')" name="ERROR_CODE" xmlns:ns="http://org.apache.synapse/xsd"/> <property expression="get-property('ERROR_MESSAGE')" name="ERROR_MESSAGE" xmlns:ns="http://org.apache.synapse/xsd"/> </log> <drop/> </sequence>
-
Click on
+
sign beside theInbound Endpoints
and selectCustom
to add a new custom inbound endpoint.
Configure the custom inbound endpoint as below. Please note that you need to update the URL, username, and password as required.
The source view of the created custom inbound endpoint will be as below.<?xml version="1.0" encoding="UTF-8"?> <inboundEndpoint class="org.wso2.carbon.inbound.poll.dbeventlistener.DBEventPollingConsumer" name="CustomerDBEventEP" onError="eventProcessFailSeq" sequence="DBEventProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse"> <parameters> <parameter name="interval">1000</parameter> <parameter name="class">org.wso2.carbon.inbound.poll.dbeventlistener.DBEventPollingConsumer</parameter> <parameter name="sequential">true</parameter> <parameter name="coordination">true</parameter> <parameter name="inbound.behavior">polling</parameter> <parameter name="driverName">com.mysql.jdbc.Driver</parameter> <parameter name="url">jdbc:mysql://localhost/test</parameter> <parameter name="username">root</parameter> <parameter name="password">root</parameter> <parameter name="tableName">CDC_CUSTOM</parameter> <parameter name="filteringCriteria">byLastUpdatedTimestampColumn</parameter> <parameter name="filteringColumnName">UPDATED_AT</parameter> <parameter name="primaryKey">ID</parameter> <parameter name="connectionValidationQuery">SELECT 1</parameter> <parameter name="registryPath">dbEventIE/timestamp</parameter> </parameters> </inboundEndpoint>
Export integration logic as a carbon application¶
Follow the steps provided in the build and export the carbon application guide.
Get the project¶
You can download the ZIP file and extract the contents to get the project code.
Tip
You may need to update the database details and make other such changes before deploying and running this project.
Deployment¶
-
Navigate to the connector store and search for
DB Event Listener
. Click onDB Event Listener
and download the .jar file by clicking onDownload Inbound Endpoint
. Copy this .jar file into the/lib folder. -
Download
mysql-connector-java
associated withMySQL
server version and add it to the/lib folder.` -
Copy the exported carbon application to the
<PRODUCT-HOME>/repository/deployment/server/carbonapps
folder. -
Start the server
Now the integration runtime will start listening to the data changes of CDC_CUSTOM
table.
Test¶
Add a new record¶
- Using the MySQL terminal, execute the following SQL to insert a new customer record into the table.
INSERT INTO `test`.`CDC_CUSTOM` (`ID`, `NAME`, `ADDRESS`, `AMOUNT`) VALUES (001, "john", "22/3, Tottenham Court, London" , 1000);
-
You can see a log entry in the WSO2 server console similar to the following.
[2024-07-12 10:46:25,674] INFO {LogMediator} - To: , MessageID: urn:uuid:C33BFFBFC43BA2E8581720761385599, Direction: request, message = event received, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><Record><ID>1</ID><NAME>john</NAME><ADDRESS>22/3, Tottenham Court, London</ADDRESS><AMOUNT>1000</AMOUNT><UPDATED_AT>2024-07-12 10:46:25.0</UPDATED_AT></Record></soapenv:Body></soapenv:Envelope>
-
If you add another new record, only that new record will get notified to the integration runtime and the old records will be ignored.
Update an existing record¶
- Using the MySQL terminal, execute the following SQL to update the added record.
UPDATE `test`.`CDC_CUSTOM` SET AMOUNT = 2000 WHERE ID = 001;
- You can see a log entry in the WSO2 server console similar to the following.
[2024-07-12 10:53:10,500] INFO {LogMediator} - To: , MessageID: urn:uuid:C33BFFBFC43BA2E8581720761790499, Direction: request, message = event received, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><Record><ID>1</ID><NAME>john</NAME><ADDRESS>22/3, Tottenham Court, London</ADDRESS><AMOUNT>2000</AMOUNT><UPDATED_AT>2024-07-12 10:53:09.0</UPDATED_AT></Record></soapenv:Body></soapenv:Envelope>
Note: You can do any type of advanced integration using the rich mediator catalog, not just logging.
What's next¶
- To customize this example for your own scenario, see DB Event Inbound Endpoint Configuration documentation for all configuration options of the endpoint.