RabbitMQ Inbound¶
Introduction¶
AMQP is a wire-level messaging protocol that describes the format of the data that is sent across the network. If a system or application can read and write AMQP, it can exchange messages with any other system or application that understands AMQP regardless of the implementation language.
Syntax¶
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="RabbitMQConsumer" sequence="amqpSeq" onError="amqpErrorSeq" protocol="rabbitmq" suspend="false">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="coordination">true</parameter>
<parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
<parameter name="rabbitmq.server.host.name">localhost</parameter>
<parameter name="rabbitmq.server.port">5672</parameter>
<parameter name="rabbitmq.server.user.name">guest</parameter>
<parameter name="rabbitmq.server.password">guest</parameter>
<parameter name="rabbitmq.queue.name">queue</parameter>
<parameter name="rabbitmq.exchange.name">exchange</parameter>
<parameter name="rabbitmq.connection.ssl.enabled">false</parameter>
</parameters>
</inboundEndpoint>
Properties¶
Listed below are the properties used for creating a RabbitMQ inbound endpoint.
Required Properties¶
The following properties are required when creating a RabbitMQ inbound endpoint.
Property | Description |
---|---|
sequential | The behavior when executing the given sequence. When set as true , mediation will happen within the same thread. When set as false , the mediation engine will use the inbound thread pool. The default thread pool values can be found in the MI_HOME/conf/deployment.toml file, under the `[mediation]` section. The default setting is true .
|
rabbitmq.connection.factory | Name of the connection factory. |
rabbitmq.server.host.name | Host name of the server. |
rabbitmq.server.port | The port number of the server. |
rabbitmq.server.user.name | The user name to connect to the server. |
rabbitmq.server.password | The password to connect to the server. |
rabbitmq.queue.name | The queue name to send or consume messages. |
coordination | This parameter is only applicable in a clustered environment. In a cluster environment an inbound endpoint will only be executed in worker nodes. If this parameter is set to true in a clustered environment, the inbound will only be executed in a single worker node. Once the running worker node is down, the inbound will start on another available worker node in the cluster. By default, this setting is true .
|
Optional Properties¶
The following optional properties can be configured when creating an RabbitMQ inbound endpoint.
Note that the optional properties related to defining a queue should contain the rabbitmq.queue.optional.
prefix,
and the optional properties related to defining an exchange should contain the rabbitmq.exchange.optional.
prefix.
Tip
Note that keystore information is not required for an SSL connection if the fail_if_no_peer_cert
parameter is set to 'false' in the RabbitMQ broker. You only need to enable SSL in the Micro Integrator (using the rabbitmq.connection.ssl.enabled
parameter).
However, if the fail_if_no_peer_cert
parameter is set to 'true' in RabbitMQ, the keystore configurations (given below) are also required for the Micro Integrator.
Shown below is an example of the config file where fail_if_no_peer_cert
is set to false
:
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
Property Name | Description |
---|---|
rabbitmq.server.virtual.host | The virtual host name of the server (if available). |
rabbitmq.connection.ssl.enabled | Whether to use TCP connection or SSL connection. |
rabbitmq.connection.ssl.keystore.location | The keystore location. |
rabbitmq.connection.ssl.keystore.type | The keystore type. |
rabbitmq.connection.ssl.keystore.password | The keystore password. |
rabbitmq.connection.ssl.truststore.location | The truststore location. |
rabbitmq.connection.ssl.truststore.type | The truststore type. |
rabbitmq.connection.ssl.truststore.password | The truststore password. |
rabbitmq.connection.ssl.version | The SSL version. |
rabbitmq.channel.consumer.qos | The prefetch message count. This many messaged will be prefetched before the application sees it. If a value is not set, 0 will be used as the default value. |
rabbitmq.consumer.tag | RabbitMQ consumer identifier. |
rabbitmq.connection.factory.network.recovery.interval | Interval at which the server will retry connecting to the RabbitMQ server in the case of a failure in the established connection. |
rabbitmq.factory.connection.timeout | Timeout for the connection initialization. |
rabbitmq.queue.durable | Whether the queue should remain declared even if the broker restarts. |
rabbitmq.queue.exclusive | Whether the queue should be exclusive or should be consumable by other connections. |
rabbitmq.queue.auto.delete | Whether to keep the queue even if it is not being consumed anymore. |
rabbitmq.queue.auto.ack | Whether to send back an acknowledgment. |
rabbitmq.exchange.name | The name of the RabbitMQ exchange to which the queue is bound. |
rabbitmq.queue.routing.key | The routing key of the queue. |
rabbitmq.queue.autodeclare |
Whether or not to declare the queue. If set to true , the Micro Integrator creates queues if they are not already
present. If set to false , the Micro Integrator will assume that a queue is already available. However, you should set this parameter to true only if queues are not already declared in the RabbitMQ server. Setting this parameter to false in the publish URL improves RabbitMQ transport performance.
|
rabbitmq.exchange.type | The type of the exchange. |
rabbitmq.exchange.durable | Whether the exchange should remain declared even if the broker restarts. |
rabbitmq.exchange.auto.delete | Whether to keep the queue even if it is not used anymore. |
rabbitmq.exchange.autodeclare |
Whether or not to declare the exchange. If set to true , the Micro Integrator creates exchanges. If set to false , the Micro Integrator will assume that an exchange is already available. However, you should set this parameter to true only if exchanges are not already declared in the RabbitMQ server. Setting this parameter to false in the publish URL improves RabbitMQ transport performance.
|
rabbitmq.factory.heartbeat |
The period of time after which the connection should be considered dead. |
rabbitmq.message.max.dead.lettered.count | The maximum number of attempts a message is allowed to be dead lettered. Once the count exceeds, the message will be discarded or published to the given 'rabbitmq.message.error.queue.routing.key'. |
rabbitmq.message.requeue.delay | Delay for the message to be requeued in the case of immediate requeing. |
rabbitmq.message.error.queue.routing.key | The routing key to publish the message after the 'max.dead.lettered' count has been reached. |
rabbitmq.message.error.exchange.name | The exchange to which messages are published after the `max.dead.lettered` count has been reached. |
Note
The property rabbitmq.exchange.name becomes mandatory if you are trying to connect to a new queue, so that it can be bound to an exchange for messages to flow.
Connection Recovery Properties¶
In case of a network failure or broker shutdown, the Micro Integrator can try to recreate the connection.
If you want to enable connection recovery, you should configure the following parameters in the inbound endpoint:
<parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>
<parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>
If the parameters are configured with sample values as given above, the server makes 5 retry attempts with a time interval of 10000 milliseconds between each retry attempt to reconnect when the connection is lost. If reconnecting fails after 5 retry attempts, the Micro Integrator terminates the connection.
Mediator Properties¶
In addition to the parameters described above, you can define RabbitMQ properties using the Property mediator and the Property Group mediator.