0
0

Spring Integration Tutorial

Tutorial
Jorge Simao Posted 01 Nov 20

Introducing Spring Integration

Spring Integration is an enterprise integration framework for Java that support the development of applications based on the principles of Enterprise Integration Patterns (EIP). It allows applications that integrate many types of protocols and data formats to be easily build by composition of components. Many components are provided out-of-the-box for a variety of protocols, and data-formats. This makes enterprise application development relatively straightforward.

Spring Integration Programming Model

Spring Integration is based on Spring Framework and as a programming model that support the same kinds of configuration styles as Spring Framework, including XML, annotations, and programmatic use of the components API. Components are bundled in different modules, and each module as a dedicated XML namespace that can be used to created the module’s components.

The XML example below illustrates the configuration of a simple integration network in Spring Integration. In addition to the beans namespace, the Spring Integration namespaces for core component and file-access are imported with prefix int and int-file, respectivelly. The namespace task is also imported to support asynchronous execution of tasks – in this case, pooling file from a spool directory.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-file="http://www.springframework.org/schema/integration/file"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd	
	http://www.springframework.org/schema/integration
	http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
	http://www.springframework.org/schema/integration/file
	http://www.springframework.org/schema/integration/file/spring-integration-file-2.1.xsd
	http://www.springframework.org/schema/task
	http://www.springframework.org/schema/task/spring-task-3.1.xsd">


<task:executor id="executor" pool-size="3" />

<int:poller fixed-delay="1000" task-executor="executor" default="true"/>

<int-file:inbound-channel-adapter directory="classpath:spool-dir" channel="incoming" >
	<int:poller fixed-delay="1000" task-executor="executor" />
</int-file:inbound-channel-adapter>

<int:channel id="incoming"/>

<int:transformer input-channel="out" ref="processor" output-channel="incomingOrders"/>

<bean id="processor" class="acme.app.MyProcessor"/>

<int:channel id="out"/>
	<int:queue />
</beans>

Note that a transformer is defined that reference an application bean that implements the application logic for the transformation. Below I show the skeleton for the processor bean. Notice that the annotation @Transform is used to specify which method should be invoked to process the message.

package acme.app;

import org.springframework.integration.annotation.Transformer;

public class MyProcessor {
  @Transform
  public void String process(String payload) {
    return ...;
  }
}

If access to the header of the messages is required the following alternative signature for the processing method could be used:

@Transform
public void Message<String> process(Message<String> message) {
    return ...;
}

Because external adaptor components in EIP can work in multiple ways, Spring Integration introduces the following categorization:

  • Adaptors — perform one-way communication between an external data protocol and the integration network. Depending on the direction of the data flow, two sub-cases are considered.

    • Inbound Adaptors — bring data from an external system to the integration network
    • Outbound Adaptors — move data produced and/or processed by the integration network to bring an external system
  • Gateways — perform two-way communication between an external data protocol and the integration network. Similarly to the one-way adaptor two sub-cases are considered, depending of whether the integration network or the exteranl system is the orginal source of the data for the processing route:

    • Inbound Gateways — bring data from an external system to the integration network, and expect a response message to be provided by some other components, through some channel, to be forward to the external system
    • Outbound Gateways — send data to an external system (produced by some other component), and expect the external system to send some response data back. The response data is them droped in some channel to be further processed by other components

External Adaptor Components

Some examples of adaptors for specific protocols include:

  • JmsOutBoundAdapter — reads messages from an input channel and sends JMS messages to a JMS destination. The payload of the JMS message is converted from the payload of the internal message. Some headers of the JMS are also initialized (e.g. priority) are also initialized form the header of the internal message
  • JmsInBoundAdapter — reads messages from a JMS destination, and drops them in an output channel for further processing.
  • HttpInboundGateway — accepts HTTP request, creates a message from that content, drops it in an output channel, and waits for a response message to be produced. When the response message is received, the HTTP response is created and written to the HTTP connection output stream. The body of the HTTP request and HTTP response are mapped to/from the payload according to the MIME type defined in the HTTP header Content-Type. HTTP headers of the response can be set headers in the internal message with ``well-know'' names.

Table below summarizes the taxonomy of external adaptors as modeled in Spring Integration:

InboundOutbound
One-WayInbound adaptorOutbound adaptor
Two-WayInbound GatewayOutbound Gateway

Taxonomy of External Adaptor Components

Using Spring Integration

Spring Integration artifacts are bundled in a variety of modules, each with specific XML namespace support. Due to the inter-dependencies between the modules and to third-party libraries, using a ``project build'' tool such as Maven is the recommended approach to import the artifacts. The XML fragment below show the Maven dependency to be included in the POM file for a Maven build project, when using the core components of Spring Integration.

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-core</artifactId>
  <version>2.1.0.RELEASE</version>
</dependency> 

Components to work with specific protocols or data formats are bundled in separated modules, and should be included as required. For example, to work file-access components the artifact named spring-integration-core should be imported. Likewise to work with components specialized in XML data-formats the artifact named spring-integration-xml should be imported.

Enterprise Integration Patterns

Spring Integration abstractions are based on the principles Enterprise Integration Patterns (EIP). The key concepts and ideas are reviewed here. (See the resources section below for further information.)

Pipes&Filters Architecture

In EIP components interact by unidirectional in-memory message passing, rather than the common interaction model based on method invocation with return values. Data exchange between components is done through communication channels, with varied delivery, buffering, and storage semantics. While in the common programming model interfaces and classes are used defining service contracts between components, in EIP components use only the message exchange API provided by the communication channels. Thus components become loosely coupled as there is no type dependencies between them nor fixed references to collaborators. The overall architecture of the application is a network (graph) of nodes – the components, linked by ``pipes'' – the message exchange channels.

Each node takes some role in the pipeline used for data exchange and processing. This include: data production, consumption, transformation, routing, filtering, data format conversion, and adaptation to data transport and application protocols. The composite work of particular routes in the network define a specific processing done on the data. Typically, for non-trivial applications, multiple routes may exist followed by data depending on the source of incoming data, the specific processing required, and the sinks of the outgoing data.

Pipes&Filters Message-Driven Architecture

Integration Components

In an integration network based on EIP, most components fit on two categories — the channels and the endpoints:

  • Channels — the connectors between the other types of components, i.e. the endpoints. (These are the “pipes” in the “pipes&filters” architectural characterization.) Channels provide a simple API defined in-terms of message passing operations, such as send() and recv() methods. Channels allow endpoint to exchange data without type dependency or being aware of each others identity. This makes inter-component communication to have characteristics similar to message-driven inter-process communication in a network, such as: assynchrony (decoupling in time), annonaminity (logical and identity decoupling, varied delivery semantics (e.g. point-to-point communication, or publish-subscribe delivery), and possibly some configurable reliability (e.g. persistence backup of messages).

  • Endpoints — these are the components that actually perform the processing logic, such as transformations, routing, filtering, or connect to external transports. (These are the “filters” in the “pipes&filters” architectural characterization.)

The endpoints can further be categorized in three groups:

  • Internal Components — perform processing of data, such as transformations, routing, and filtering. A sub-category of transformations involve conversion of data-format between them (e.g. XML and CSV), or more commonly mapping these data-format to an internal object-oriented representation, such a Java object.
  • External Adaptors — move data in and out from/to external sources/sinks using specific protocols (e.g. HTTP, FTP, etc.). Adaptors can be designed to work in a single direction, or both directions.
  • Internal Adaptors — perform the adaptation between the network internal components or external adaptors and the components that implement application logic. This include invoking methods in the application components (service activators)), and non-intrusive channelling of procedural requests performed by the application components to the communication channels of the integration network (internal gateways)

Integration Architecture with External Adaptors, Internal Components, and Internal Adaptors Connected by Channels

Message Format

In the EIP approach, data exchange between components is performed using messages. These messages have contain two types of information – the payload part, which caries application data; and the headers — named attributes with some value — that carry meta-data about the message and the data, such as: priority and routing information, headers mapped from the headers an external message received using some transport protocol, and application set header that specify how response messages should be sent to external systems.

The two type of information present in a message – payload and header, are usually represented visually in an explict form. This highlight the fact that header have an important role in controlling how data is processed and routed.

Messages with Payload and Headers

The Problem of Message Correlation

In EIP, the inter-component interaction model is based on message-passing. Thus it follows and takes inspiration from the pattern of inter-process communication in a distributed system,
rather then the common interaction model based on method invocation with a return value. While this allows to bring the characteristic and benefits of message-based communication to an application architecture, it also bring some issues such as dealing with two-way concurrent requests.

If two endpoints send a concurrenty messages to the same channel and expect some response to be returned in another channel, and delivery or processing is done asynchronoully, i.e. by different threads, there needs to be some way to guarantee that the response message is received by the right endpoint. The problem also occurs if a single endpoint send two messages to the same channel, one after the other, before the first response message is received. This is know as the problem of message correlation. And its a well-know issue in message-driven inter-process communication (e.g. based on the JMS API) when implementing request-reply application-level protocols.

Message correlation is addressed in EIP in way similar to the solution in message-driven inter-process communication. Namely, a temporary channel is created before forwarding each request message. The identity of the created temporary channel is included as a ReplyTo header in the message. The requesting endpoint expects the endpoint sending the response message to respect this ReplyTo, and therefore performs the receive operation on it. Since different requests and/or endpoints use different temporary channel, it becomes unambiguous which endpoint that should receive each response message.

Message Correlation with ReplyTo Header

When the request is performed on behalf of a client external system, the identity of the client or the context for response (e.g. connection, socket, or some other handler), is also included in the request message and copied to the response message. This allow the response to be forward to the matching client.

Resources

Resources by the same author

External References/Resources

Concepts and Components

Message

Message Channel

Message Endpoints

Transformer

Transformer Component

Filter

Filter Component

Router

Router Component

Splitter

Aggregator

Splitter and Aggregator

Splitter and Aggregator Components

Service Activator

Channel Adapter

Messages and Messaging Operations

Message Interface

In Spring Integration all interaction between components is done by (undirectional) message passing. Each message contains an arbitrary payload object, and a set of message headers. The generic interface Message is used to model this. The type parameter for Message is the type of the payload object.

public interface Message<T> {

   T getPayload();

   MessageHeaders getHeaders();

}

Note that in Spring Integration a message is a read-only (immutable) object. Once a Message is created with some payload, it is not possible not modify the payload. This is because in EIP concepts and in Spring Integration an analogy is made between message passing in a distributed system, and message passing between lossely-coupled components in the same JVM. The same way that it is not possible to change in the content of a message once it is sent in a distributed system, the same should not be possible also for inter-components communication.

A corollary of the above, is that if an endpoint changes the payload of a message, the semantics of message passing breaks — specially, if their are multiple receivers for the message. It is possible that an endpoint changes the state of the payload object — but this is not recommended, for the reason just explained. As there is no practical way for a framework prohibit this explicitly, the application developer should follow this rule as a best practice.

Message Implementations

The commonly used implementation of Message is the class GenericMessage. The definition GenericMessage is sketched below:

public class GenericMessage<T> implements Message<T> {
  public GenericMessage(T payload) { ...
}
  
  public GenericMessage(T payload, Map<String, Object> headers) { ...
}
  
  ...
}

Note the availability of two constructors. The first constructor takes as parameter only the payload of the message. The second constructor takes as additional parameter a Map for the message headers.

Code sample below illustrates how a GenericMessage can be created and sent to some channel:

Map headers = new HashMap();
 headers.put("orderType", OrderType.SPECIAL_DELIVERY);
 headers.put(MessageHeader.EXPIRATION_DATE, DateUtil.daysToExpire(30));
 headers.put("description", "Please deliver to the attention of Mr.
J");

Message<?> message = new GenericMessage(order, headers);

orderChannel.send(message);

The class ErrorMessage is a convenient sub-class of GenericMessage that takes as payload a Throwable. This is the message used to report message delivery errors. A instance of ErrorMessage is droped in an error channel by asynchronous delivery channels.

Message Headers

The message headers are modeled with class MessageHeaders. The set of message headers of a particular Message is also read-only — it is not possible to change is once the message is created. A sketch of the specification of MessageHeaders, is shown below:

public final class MessageHeaders implements Map<String, Object>, Serializable {
	...
}

MessageHeaders implements the interface Map<String, Object>, but only read methods, such as get(), are supported. Write methods, such as put(), remove(), and clear(), throw an UnsupportedOperationException.

Message header fall in to categories: application specific headers, and framework pre-defined headers. Application specific headers can have any name desired, and are used to convey application specific metadata. Framework defined headers can be setup and initialized by the framework, either on the message constructor or by specific framework components (endpoints). For framework defined headers, class MessageHeaders defines convenient getter methods to retrieve the value.

Code sample below shows how to retrieve the value of application specific header and framework defined headers. Note the use of the type-safe retrieval methods get() for application specific headers, and the use of get*() methods for framework pre-defined headers.

Message<?> message = channel.receive();

Object description = message.getHeaders().get("description");
OrderType orderType = message.getHeaders().get("orderType", OrderType.class);
Long timestamp = message.getHeaders().getTimestamp();

Long expiration = message.getHeaders().getExpirationDate();

if (new Date(expiration).after(new Date())) {
 processOrder((Order)message.getPayload(), orderType, timestamp, description);
}

Table below summaries the framework pre-defined message headers.

Header NameValue TypeMethod NameDescription
IDjava.util.UUIDgetId()Unique identifier (randomly generated)
TIMESTAMPLonggetTimestamp()Time of creation
EXPIRATION_DATELonggetExpirationDate()Time validity/relevance
SEQUENCE_NUMBERIntegergetSequenceNumber()Index in a sequence
SEQUENCE_SIZEIntegergetSequenceSize()Size of the sequence
CORRELATION_IDObjectgetCorrelationId()Message ‘‘bucket’’ correlation
PRIORITYIntegergePriority()Used by PriorityChannel for ordering
REPLY_CHANNELString or MessageChannelgetReplyChannel()Reply channel for Request-Reply
ERROR_CHANNELString or MessageChannelgetErrorChannel()Channel to send exception messages

Framework pre-defined message headers.

MessageBuilder

Because MessageHeaders is immutable, message headers needs to be be specified as a Map in the constructor of a GenericMessage. An alternative away to create a message is to use the helper class MessageBuilder that provides convenient methods to create a Message and setup its headers.

Code sample below illustrates how to use MessageBuilder to create a message and setup its headers:

Message<String> message = MessageBuilder.withPayload(order)
	 .setHeader("orderType", OrderType.SPECIAL_DELIVERY);
	.setExpirationDate(DateUtil.daysToExpire(30));
	.setHeader("description", "Please deliver to the attention of Mr. J.");
	.build();
 
orderChannel.send(message);

Method MessageBuilder.withPayload() is use to first define the payload of the message. Method setHeader() is used to set application defined headers. For the framework pre-defined message headers, MessageBuilder provides specific ``setter'' methods. Method build() is last called to create the immutable Message with immutable MessageHeaders.

Message headers can also be copied from existing MessageHeaders taken from an existing message. This is illustrated below:

Message<String> message2 = MessageBuilder.withPayload(order)
       .copyHeaders(message.getHeaders())
       .build();

A message created by copy can also override some of the header values, as illustrated below:

Message<String> message2 = MessageBuilder.withPayload(order)
       .setPriority(OrderPriority.URGENT.ordinal())
       .copyHeadersIfAbsent(message.getHeaders())
       .build();

Or, alternatively, headers may be set conditionally on not being yet defined, as illustrated below:

Message<String> message2 = MessageBuilder.withPayload(order)
       .copyHeaders(message.getHeaders())
       .setHeaderIfAbsent("orderType", OrderType.SPECIAL_DELIVERY)
       .build();

MessagingTemplate

The utility class MessagingTemplate can be used to send and receive message to channels. This is an alternative way to use the channel API directly.

Code sample below show how to use MessagingTemplate for to send a message and receive a reply (e.g. a confirmation message):

MessagingTemplate template = new MessagingTemplate();

Message orderMessage = ...

Message reply = template.sendAndReceive(orderChannel, orderMessage);

if (reply.getMessageHeaders().getHeader("STATUS")==OrderStatus.COMPLETED) {
	//...
}

Method sendAndReceive() sets the header MessageHeaders.REPLY_CHANNEL to an anonymous temporary reply channel. After sending the message to the specified channel, it blocks until a message arrives at the temporary reply channel. The receiving endpoint is supposed, directly or indirectly, to make a reply message to arrive at that channel.

Simpler unidirectional message sending is also supported:

template.send(orderChannel, orderMessage);

Message<?> reply = template.receive(confirmationChannel);

Sending and receiving operation can also be done directly on payload objects rather than instance of message. Internally, a MessageConverter strategy object is called to create the message from the payload. The default implementation simply creates a message without any application specific headers. An application can also implement and set a custom MessageConverter (e.g. for setting appropriate message headers).

The example below shows how to send and receive message payloads directly (delegation the work of message creation and payload extraction to the installed MessageConverter).

template.convertAndSend(orderChannel, order);

OrderConfirmation confirmation = template.receiveAndConvert(confirmationChannel);

Note that object (de)serialization is usually not required to be done by a MessageConverter, since Spring Integration channels and messages are a local abstraction. The API of MessagingTemplate is mostly inspired by that of JmsTemplate.

Exercises to the Reader

  • Study the API of MessageConverter and make an implementation that sets the priority of all the created messages to a value specified in the constructor of the implemented class. Configure a MessagingTemplate to use it and test it.

Message Channels

Messages channels are used in Spring Integration to connect endpoints. Channels come in two categories:

  • Poolable — for synchronous reception on buffered channels;
  • Subcribable — message delivery performed by calling reception handler.

Message Sending API

The message sending API is the same for both categories of channels, and is modeled with the interface MessageChannel shown below:

public interface MessageChannel {

   boolean send(Message<?> message);

   boolean send(Message<?> message, long timeout);
}

In the second variation of the method send(), the timeout parameter specifies the amount of time (in milliseconds) the sending thread is willing to wait to perform the send operation. Waiting may be required in buffered channels that reached the maximum capacity for holding Messages. For the first variation of the method send() the time is takes to complete the send operation is unspecified and not bound (may block forever, in some implementations).

Both send() methods returns a boolean value that specified if the Message was sent successfully. Fatal errors are reported with a thrown RuntimeException, while non-fatal errors (logically correct) are reported by simply returning false.

Poolable Channels API

Poolable channels buffer sent messages and message reception is performed by explict synchronous invocation of a receive() method. This is modelled with the interface PollableChannel shown below.

public interface PollableChannel extends MessageChannel {

   Message<?> receive();

   Message<?> receive(long timeout);
}

In the second variation of the method send(), the timeout parameter specifies the amount of time the receiving thread is willing to wait to get a Message. If a Message is not available in the specified timeout value, then null is returned. The first variation of the method receive() blocks indefinetelly until a Message is available.

Subscribable Channels API

In subscribable channels message delivery is performed by calling a reception handle. Message delivery is usually done in the same thread of the sender. So, from the perspective of the sender, message sending–receiving is synchronous. On ther hand, the programmatic model is one of asynchronous delivery. A message handler method need to be registred with the channel. This category of channels is modelled with interface SubscribableChannel shown below:

public interface SubscribableChannel extends MessageChannel {

   boolean subscribe(MessageHandler handler);

   boolean unsubscribe(MessageHandler handler);

}

Method SubscribableChannel.subscribe() is used to register a handler for delivered messages (subscribers), modeled with interface MessageHandler. Method SubscribableChannel.unsubscribe() cancels a previous registration of a message handler. The specification of interface MessageHandler is shown below:

public interface MessageHandler {
	void handleMessage(Message<?> message) throws MessagingException;
}

The single method MessageHandler.handleMessage() is used to deliver and process Messages.

The code below, shows an example of a simple MessageHandler implementation that echo to the console the payload of the message (prefixed by this for self-identification).

public class PayloadEchoMessageHandler implements MessageHandler () {
	void handleMessage(Message<?> message)  {
		System.out.println(this + ":" + message.getPayload());
	}
}

Direct Channels

A DirectChannel is a point-to-point subscribable channel. Message delivery is done with the sending thread, and a each sent message is delivered to at most one subscribing handler (assuming no errors occur). The computational overhead of sending and delivering a Message with a DirectChannel, is comparable to simple method invocation between two endpoint components. However, because communication between endpoints is done indirectly through the channel, the endpoints become loosely-coupled.

Because in the DirectChannel sending and receiving is done by the same thread, the sender logic can always assume that the message was processed before control returns to the sending method. The only exceptions is if there is no subscriber to the channel or if there are unrecoverable errors.

A consequence of having the message sending and delivery performed in the same thread, is that the transactional context of the sending and handler method is the same. That is, transactional operations in the sender and receiver method, such as data access or reliable message sending (e.g. with JMS), will share the same transaction. Thus an explicit or forced rollback in the receiver will also abort the operation in the sender method.

Another consequence of synchronous delivery is that (unrecoverable) exceptions thrown by the message handler method are propagated back to the sending method.

Internally, the DirectChannel delegates some of the work involved in message delivery to a MessageDispatcher strategy object. The behavior of default implementation, is to perform round-robin delivery with failover. Round-robin means that, when multiple receivers (MessageHandler) have subscribed the channel, message are delivered in turns — with the first message delivered to the first receiver, second message delivered to the second receiver, and so on, until delivery comes back to the first receiver. Failover means that if an exception is thrown be a receiver, delivery is performed for the next receiver, until the message is successfully handled or all receivers have by tried. If none of the receiver is able to handle the message (e.g. because the payload is in someway ``corrupted''), the final exception is propagated back to the sender method.

The code sample below illustrate the programmatic use of a DirectChannel, with two registred receivers:

DirectChannel channel = new DirectChannel();
channel.subscribe(new PayloadEchoMessageHandler());
channel.subscribe(new PayloadEchoMessageHandler());
channel.send(new Message<String>("hello,"));
channel.send(new Message<String>("world!"));

Receiver then gets message in turns:

PayloadEchoMessageHandler@xxx:hello,
PayloadEchoMessageHandler@yyy:world!

Executor Channels

An ExecutorChannel is used for asynchronous point-to-point delivery. ExecutorChannel has a programming and delivery model similar to DirectChannel, i.e. it is a subscribable channel used for point-to-point communication. However, it has the important difference that message delivery is done in separate thread from the one running ther sending method. The thread performing the delivery is a thread managed by a TaskExecutor, that should be configured with the ExecutorChannel.

Because with ExecutorChannel message sending and delivery is, in general, done is a different threads, the logic in the sending method can not assume anything about the relative timing of the method send() returns, and the delivery and processing of the message. Another consequence of the asynchronous delivery, is that transactional operational in the receiver run in a different transaction from the operations in the sending thread. (See the Spring Integration reference manual, for exception to this rule.)

ExecutorChannel channel = new ExecutorChannel();
...

Publish-Subscribe Channel

A PublishSubscribeChannel is a channel used for message diffusion (multicast) across multiple receivers. It implements the SubscribableChannel, therefore receivers must implement the MessageHandler interface and be registered in (subscribe) the channel. Each receiver gets the same reference to every sent message (no message copy is performed) — therefore, it is not recommended that receivers change the state of the payload object. (This is true in general, but more obviously for the PublishSubscribeChannel).

By default, message delivery is performed by the sending thread to all the receivers. Thus, from the perspective of the sending method logic, message sending is synchronous and return of the method send() implies that all the receiver have processed the message.

A simple example using a PublishSubscribeChannel for message diffusion with synchronous (same-thread) delivery is shown in the code sample below:

PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.subscribe(new PayloadEchoMessageHandler());
channel.subscribe(new PayloadEchoMessageHandler());
channel.send(new GenericMessage<String>("hello!"));

Produces as output to the console:

INFO : org.springframework.integration.channel.PublishSubscribeChannel - Channel 'null' has 1 subscriber(s).
INFO : org.springframework.integration.channel.PublishSubscribeChannel - Channel 'null' has 2 subscriber(s).
ei.spring.integration.PayloadEchoMessageHandler@XXXXX:hello!
ei.spring.integration.PayloadEchoMessageHandler@YYYYY:hello!

The code example below configures the PublishSubscribeChannel to use a TaskExecutor to execute message delivery in a different thread fro the sender. Specifically, a ThreadPoolTaskExecutor implementation (a wrapper to java.util.concurrent.ThreadPoolExecutor), is used.

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.afterPropertiesSet();
PublishSubscribeChannel channel = new PublishSubscribeChannel(taskExecutor);

CounterMessageHandler count0 = new CounterMessageHandler();
channel.subscribe(count0);
CounterMessageHandler count1 = new CounterMessageHandler();
channel.subscribe(count1);

channel.send(new GenericMessage<String>("hello!"));
synchronized (this) {
	try {
		wait(1000);
	` catch (InterruptedException e) {
	}
}
assertEquals(1, count0.getCount());
assertEquals(1, count1.getCount());	

Queue Channels

A QueueChannel is a buffered point-to-point channels that delivers message in FIFO order. It implements the PollableChannel interface, thus message reception is done by invoking method receive(). Internally, messages are stored in a queue, that can be configured to have a maximum capacity. By default, the channel as (in practice) an unbound capacity. The capacity of the channel can be specified as a constructor argument.

When a send operation is performed on a QueueChannel that is holding messages in its maximum capacity, the sending thread blocks until some other thread consumes one or more messages from the channel.

Code sample below, shows how programmatically create an unbounded QueueChannel that is used send and receive messages:

QueueChannel channel = new QueueChannel();
channel.send(Message("hello!"));
--
Message<?> message = channel.receive();
assertTrue("hello!", message.getPayload());

Priority Channels

A PriorityChannel is a buffered point-to-point channel, similar to a QueueChannel, except that messages are delivered by priority rather than FIFO order. Message priority is determined by the value of the message header named priority. A Comparator<Message<?>> can also be configured as a constructor argument, to use an alternative ordering.

Code sample below, shows how programmatically create a PriorityChannel and sent prioritezed messages that are received by priority (more ``urgent'' messages delivered before):

PriorityChannel channel = new PriorityChannel();
Message message1 = new Message("hello!").getMessageHeaders().setPriority(1);
channel.send(message1);
Message message2 = new Message("URGENT:hello!").getMessageHeaders().setPriority(2);
channel.send(message2);
--
Message<?> message = channel.receive();
assertTrue("URGENT:hello!", message.getPayload());

Message<?> message = channel.receive();
assertTrue("hello!", message.getPayload());

RendezvousChannel

A RendezvousChannel is pollable channel that blocks the sending thread until some other thread picks the message on the channel by invoking method receive(). It is comparable to a zero-capacity QueueChannel with blocking message sends. However, message delivery is still asynchronous.

ChannelsPoint-to-PointPublish-Subscribe
SubscribableChannel (Non-Buffered)DirectChannel;ExecutorChannelPublishSubscribeChannel
PollableChannel (Buffered)QueueChannel;PriorityChannelNA

Taxonomy of channels in Spring Integration

XML Configuration of Message Channels

While programmatic creation and configuration of Spring Integration components gives maximum flexibility and control, XML declarative configuration is simpler. This is because of features such as forward name resolution, automatic initialization of components (by invocation of handlers for the initialization life-cycle event), and other. XML namespace support also given a more focused set of abstraction, rather than a ``full-blown'' Java API.

Spring Integration Namespace

Spring Integration provides several XML namespaces to simplify the task of defining integration components (such as enpoints and channels). The core components including all channel types and basic Enterprise Integration Patterns components, such as endpoints for messages routing and transformation, are available in namespace:
http://www.springframework.org/schema/integration.

The XML document template below shows how the core Spring Integration XML namespace can be imported (along side with the <beans:*> namespace). In this case, elements for this namespace are prefix <int:*>. The specification of XML schemas ensures IDE (and deployment time) validation.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xsi:schemaLocation="
          http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/integration
          http://www.springframework.org/schema/integration/spring-integration.xsd">

	<!-- Schema validate Spring Integration elements available as <int:*> -->
	...
</beans>

DirectChannel Configuration

DirectChannel are the most commonly used type of channels in SpringIntegration, since they are the simplest and light-weighted way to interconnect endpoints.

XML fragment below shows how a DirectChannel can be create with element <int:channel>. The attribute id is used to specify the name of the Spring bean representing the channel. The name is also used to cross-reference channels and endpoints in XML.

<int:channel id="orderChannel

The dispatching options for the default MessageDispatcher used by the channel can also be specified in sub-element <int:dispatcher>.

XML fragment below shows how to configure the MessageDispatcher not to perform redelivery to the next receiver when a previous receiver throws an Exception. This is specified by settring attribute failover="false".

<int:channel id="orderChannel">
	<int:dispatcher failover="false
</int:channel>

Round-robin delivery can also be surpressed by setting the dispatcher attribute load-balancer="none", as shown below:

<int:channel id="orderChannel">
	<int:dispatcher load-balancer="none
</int:channel>

Channel Payload Data Types

Channels can be restricted to accept sending messages only with specific datatypes. This is configured by setting attribute datatype of the channel element with the fully-qualified name of the payload data type. Multiple types can also be specified separated by ‘,’. If a message with a non-matching payload is sent to the channel, an type conversion is not possible, an exception is thrown.

Code sample below shows how to configure the payload data type for a direct channel. All channel types support this attribute for configuration.

<int:channel id="orderChannel" datatype="app.domain.Order" />

Type relationships are respect by the channel when performing the type check. That is, any compatible type (derived class or interface implementation) is allowed. In the example above, the payload type SpecialOrder is also acceptable provided that SpecialOrder derives from Order.

If a bean of type ConversionService and name integrationConversionService is available in the ApplicationContext, then it is picked up by a channel and used to attempt to convert the payload type to the expect data type (unless the payload type is already compatible). Only if conversion is not possible an exception is thrown. Individual Converter instances can also be registered in the bean integrationConversionService with element <int:converter>. This element also created automatically a bean integrationConversionService if it does not exist.

ExecutorChannel Configuration

An ExecutorChannel can be defined by setting the attribute task-executor in the element <int:dispatcher> of an otherwise direct channel. The attribute should reference a bean of type TaskExecutor.

The sample below show how to create a ExecutorChannel. A ExecutorChannel bean is created with supported from the <task:*> namespace.

<int:channel id="orderChannel">
	<int:dispatcher task-executor="someExecutor" />
</int:channel>

<task:executor id="taskExecutor

QueueChannel Configuration

A QueueChannel can be create in XML by further configuring the element <int:channel> with sub-element <int:queue>. This makes it very ``smooth'' in terms of configuration to change the type of a channel from a SubscribableChannel to a PollableChannel. The attribute capacity is used to specify the capacity of the queue (by default it is unbound).

Xml sample below show how to define a QueueChannel with 100 messages as capacity:

<int:channel id="orderChannel">
	<int:queue capacity="100" />
</int:channel>

Note that the sub-elements <int:queue> and <int:dispatcher> are mutually exclusive, since the former makes the channel pollable and the later is used to configure delivery in the subscriabable channel.

PriorityChannel Configuration

The PriorityChannel is create is a similar manner to the QueueChannel except that the XML sub-element for <channel> is <priority-queue>. The attribute capacity can be used (optionally) to specify the maximum number of messages in the queue.

<int:channel id="orderChannel">
   <int:priority-queue capacity="100
</int:channel>

The order/priority of message delivery is done by default by comparision of the priority header in messages. Alternatively, a bean of type Comparator<Message> can be used to impose other ordering. This is done by setting attribute comparator, as illustrated below.:

<int:channel id="orderChannel">
   <int:priority-queue capacity="100" comparator="orderComparator" />
</int:channel>

<beans:bean id="orderComparator" class="app.OrderComparator" />

A possible implementation for app.OrderComparator is show below:

package app;

public class OrderComparator implements Comparator<Order> {
 ...
}

RendezvousChannel Configuration

A RendezvousChannel can be configured, similarly to the QueueChannel and PriorityChannel, by providing a sub-element to <channel> — this time the element <rendezvous-queue/>. Because a RendezvousChannel has zero-capacity, the attribute capacity is not allowed.

<int:channel id="orderChannel">
	<int:rendezvous-queue/>
</int:channel>

PublishSubscribeChannel Configuration

The PublishSubscribeChannel channel can be define in XML with element <int:publish-subscribe-channel>. The optional attribute task-executor is used to specify a bean of type TaskExecutor.

Sample below show how to create a PublishSubscribeChannel channel configured with asynchornous delivery. A TaskExecutor bean is created with supported from the <task:*> namespace.

<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

<task:executor id="taskExecutor"/>

Channel Interceptors

Spring Integration provides a common mechanisms to intercept messages sent to all types of channels. This interception can be used for observing the content of messages header and payload, transform messages header or replace by an alternative message, or preempt message delivery.

ChannelInterceptor API

A channel interceptor is modeled with the interface ChannelInterceptor, whose specification is shown below:

public interface ChannelInterceptor {

	Message<?> preSend(Message<?> message, MessageChannel channel);

	void postSend(Message<?> message, MessageChannel channel, boolean sent);

	boolean preReceive(MessageChannel channel);

	Message<?> postReceive(Message<?> message, MessageChannel channel);
}
  • Method preSend() and postSend() are invoked for all types of channels. On the other hand, methods preReceive() and postReceive() are invoked only for pollable channels (i.e. implement the interface PollableChannel, such as the QueueChannel, the RendezvousChannel, and the PriorityChannel).

  • Method preSend() is invoked just before a message is ``droped'' into the channel. That is immediately after method MessageChannel.send() is invoked and before it returns or any receiver sees the message. The Message being intercepted as well as a reference to the MessageChannel is made available as parameters of the interceptor method.

  • For unmodified delivery of the message the method preSend() should return the Message passed as argument. Alternatively, the method may modify the headers of the input Message before returning that same Message. For changing the payload of the delivered Message, a completely new message needs to created and returned, since Message object in Spring Integration are immutable. If only the state of the payload object needs to be changed but not the actual object, the calling property setters in the payload object is possible. However, as discussed in the messaging section of this tutorial, this is not recommended.

  • Method send() can also preempt the sending of the messagepreSend() by returning null. Alternatively, the method can throw a RuntimeException to preempt message sending.

  • Method postSend() is invoked just before the MessageChannel.send() return control back to the calling method. The parameter sent specifies if message delivery occurred (i.e. the message was not preempted by some channel interceptor, or some circumstance that makes the method MessageChannel.send() return false).

  • Method preReceive() is invoked just after the message is removed from the channel, but before the message is returned by method PollableChannel.receive(). If the method preReceive() returned false, message delivery is prevented. (Since in Spring Integration all PollableChannel are point-to-point channels, this means that the message is discarded.)

  • Method postReceive() is invoked just before the method PollableChannel.receive() return control back to the receiving method with the delivered Message as return value. For unmodified Message delivery the postReceive() method should return the same Message passed as parameter. Alternatively, the message can be modified or replaced in the same aways that can be done in method preSend().

The class ChannelInterceptorAdapter can be used as a support class for the implementations of ChannelInterceptor. It provides ``identity'' behavior for the four interception methods — returning the same unmodified messages and without preempting message delivery. This simplifies implementation of interceptors since only required method need to be overriden.

Code below, show an example of a ChannelInterceptor that counts the number of sent and received message in each channel:

public class StatsInterceptor extends ChannelInterceptorAdapter {
	Map<MessageChannel, Long> sentCountMap = new HashMap<MessageChannel, Long>();
	Map<MessageChannel, Long> recvCountMap = new HashMap<MessageChannel, Long>();
	
	@Override
	void postSend(Message<?> message, MessageChannel channel, boolean sent) {
		Long n = sentCountMap.get(channel);
		sentCountMap.put(channel, n!=null ? n+1, 1);
	}

	@Override
	Message<?> postReceive(Message<?> message, MessageChannel channel) {
		Long n = recvCountMap.get(channel);
		recvCountMap.put(channel, n!=null ? n+1, 1);
	}
}

Programatically, a ChannelInterceptor can be registered in a channel by invoking method addInteceptor(). This is illustrated below:

AbstractMessageChannel channel = new DirectChannel();

StatsInterceptor statsInterceptor = new StatsInterceptor();

channel.addInterceptor(statsInterceptor);

XML ChannelInterceptor Configuration

For XML based configuration, instances of ChannelInterceptor can be registred in a channel with sub-element <int:interceptors>. The collection of interceptors for the channel is declared as a bean definition or bean reference. Because an interceptor can be resued across multiple channels, it is more common to use bean references with element <ref>.

An example of configuring a DirectChannel with the demo interceptor is shown below:

<int:channel id="orderChannel">
   <int:interceptors>
       <ref bean="statsInterceptor
   </int:interceptors>
</int:channel>

<beans:bean id="statsInterceptor" class="app.StatsInterceptor" />

Global Channel Interceptor

Channel interceptors can also be applied many element with a single configuration element. This is done with element <int:channel-interceptor> that defines a global interceptor. The (optional) attribute pattern specifies a channel name pattern to select which channels should the interceptor apply. If the attribute is omitted the interceptor is applied to all channels defined in the application context.

XML sample below shown an example of how to configure a global interceptor:

<int:channel-interceptor ref="statsInterceptor" pattern="*Channel" order="1">

, or Alternatively:

<int:channel-interceptor pattern="*Channel" order="1">
	<beans:bean id="statsInterceptor" class="app.StatsInterceptor" />
</int:channel-interceptor>

The (optional) attribute order is used to specify the relative ordering of the global interceptor in relation to other interceptors (global or channel specific). The default ordering for global and channel specific interceptors is 0. Setting the value of attribute order to a value >0 makes that interceptor be invoked after other interceptor with default ordering. Conversely, setting the value of attribute order to a value <0 makes that interceptor be invoked before any other interceptor with default ordering.

Wire Tap

A very useful kind of interceptor is the WireTap. This an interceptor that sends the intercepted message to an additional channel, without affecting the message delivery in the wire-tapped'' channel. Because it is so commonly used, **Spring Integration** provides XML namespace support for it. This is with element `<wire-tap>). The attribute channel specifies the name of the channel to which the messages should additionally be sent to.

XML snippet below illustrates how to setup a wire-tap interceptor in a channel:

<int:channel id="orderChannel">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="INFO" log-full-message="true" />

In the example, element <logging-channel-adapter> is used to create a message handle, of type LoggingHandler, that logs messages. AttributeAttribute level is used to specify the logging level of messages (possible values are: FATAL, ERROR, WARN, INFO, DEBUG, and TRACE). Optional attribute log-full-message specifies if message header should also be logged. By default, only the message payload is logged. Alternatively, the attibute expression can be used to specify an SPEL expression predicated on the message payload and/or message headers.

A wire-tap can also be setup as a global interceptor, with an optional pattern attribute for a channel name selector:

<int:wire-tap pattern="*Channel" order="1" channel="logger"/>

Exercises to the Reader

  • Modify the implementation of StatsInterceptor described above to log the number of sent and received channels. For a SubscribableChannel only the number of sent message should be logged, since the metric more received messages is always zero.

Non-Intrusive Messaging with Gateways and Service Activators

ServiceActivator

Channel Adaptor

Inbound Messaging Gateways

Message Polling and Bridge

Message Routing

A message router is an endpoint that forwards incoming messages to one (or more) among several configured output channels. The criteria for selecting the output channel is usually based on the message payload and/or the message headers. Spring Integration support several types of message routers, with different ways to specify the routing criteria, for generic or specific payload types.

Generic Routers

The most generic and flexible way to define a router is to use a Java object to implement the criteria for routing. The routing method in the Java object is supposed to return a value that specifies or identifies the output channel to which the router endpoint should send the message.

XML fragement below show how to define a generic router, with element <order>, that reference to a bean implementing the routing criteria. Attribute input-channel specifies the name of the input channel for the incoming messages. Attribute ref specifies the name of the bean implementing the routing criteria.

<int:router input-channel="orderChannel" ref="orderRouter"/>

<beans:bean id="orderRouter" class="app.OrderRouter" />

With the XML setting above the bean implementing the routing criteria needs also to be the router by extending class AbstractMessageRouter. Abstract method determineTargetChannels() should be defined to implemented the routing criteria, method return should return a collection of MessageChannel with one or more channels to which the message should be routed. This arguably rather low-level approach to define a router and routing criteria is scketched below:

public class OrderRouter extends AbstractMessageRouter {
	protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
		...
	}
}

POJO Routing Criteria

An alternative and more convenient way to define a generic router, is to use a POJO to implement the routing criteria. One of the POJO methods should implement the routing criteria — by returning the name or a reference to the MessageChannel (or a collection of them) to which the message should be routed. The attribute method is element <router> is used to specify the method name. This is illustrated below:

<int:router input-channel="orderChannel" ref="orderRouter" method="routeOrder"/>

<beans:bean id="orderRouter" class="app.OrderRouter" />

An example POJO class definition for the routing criteria is shown below:

public class OrderRouter  {
	protected String routeOrder(Message<Order> message) {
		return isSpecialDelivery(message) ? "specialDeliveryChannel" : "commonOrderChannel";
	}
}

The way this is made to work internally is by having the handler of XML element <router>`` create a bean instance of typeMethodInvokingRouter(which is a sub-class ofAbstractMessageRouter`). This bean invokes the specified criteria method by using the Java reflection API, and makes the required adapations to the Spring Integration API.

The method implementing the routing criteria can also be specified using annotation @Router. In this case, the specification of attribute method in element <router> is no longer required. This is illustrate below:

<int:router input-channel="orderChannel" ref="orderRouter"/>
public class OrderRouter  {
	@Router
	protected String routeOrder(Message<Order> message) {
		return isSpecialDelivery(message) ? "specialDeliveryChannel" : "commonOrderChannel";
	}
}

When using POJO routing criteria class there is not a strict method signature. The method can take a Message as parameter. Alternatively, the method parameter can be of other typr assumed to be type of the message payload. As mentioned above the return type can also vary, including: String, List<String>, MessageChannel, and List<MessageChannel>. Additionally, parameters can be ``injected'' with a value taken from a message header. This is specified with annotation @Header, as illustrated below:

public class OrderRouter  {
	@Router
	protected String routeOrder(Order order, @Header("orderType") OrderType type) {
		return type==OrderType.SPECIAL_DELIVERY || order.getCustomer().isVIP() ?
			"specialDeliveryChannel" : "commonOrderChannel";
	}
}

Routing Criteria with SPEL Expressions

When the routing criteria is simply enough, it often more convenient to define it with a SPEL expression, rather than a dedicated Java bean. The result of the expression evaluatation is taken as the name of the channel to which the message should be routed. SPEL expressions are configured by setting the attribute expression in the element <router>. A couple of examples is shown below:

<int:router input-channel="orderChannel" expression="headers['type']"/>

<int:router input-channel="orderChannel"
		expression="orderUtil.isXml(payload) ? 'xmlChannel' : 'csvChannel'"/>

The identifiers payload and headers can be used in the SPEL expression as implicit variables referencing, respectively, to the payload and headers of the message being routed.

PayloadTypeRouter

For specific categories of criteria functions, Spring Integration provides out-of-the-box speciallized types of routers. The PayloadTypeRouter routes messages to channels according to the type of the payload. Using XML namespace support, a PayloadTypeRouter can be conviently created with element <payload-type-router>.

The routing of each payload type is specified with sub-element <mapping>. The attribute type specifies the fully qualified name of the payload type, and the attribute channel specifies the name of the channel to route the messages to. A default channel can also be set with attribute default-output-channel in element <payload-type-router>.

The example below shows how <payload-type-router> is used to create a router that checks the type of the payload and sends messages to the corresponding channel:

<int:payload-type-router input-channel="orderChannel" default-output-channel="miscOrderChannel" >
   <int:mapping type="org.w3c.dom.Document" channel="xmlOrderChannel" />
   <int:mapping type="java.lang.String" channel="csvOrderChannel" />
</int:payload-type-router>

Messages with a payload type not matched by any <mapping> element will be sent to the default channel. If a default channel is not provided, an Exception is throw if a message with a non-mapped payload type is received; unless the attribute resolution-required is set to false — in which case the message is simply ignored.

HeaderValueRouter

Messages can also be routed according to the value of some header. This is implemented by class HeaderValueRouter, and can be conveniently created in XML with element <header-value-router>. The mandatory attribute header-name specifies the name of the attribute on which routing is based on.

Sub-element <mapping> specify routing rules for each value of the routing header. Attribute value`` specifies the value of the header to match, and attributechannel` specifies the name of the channel to route message to. An example is show below:

<int:header-value-router input-channel="orderChannel" header-name="status" >
	<int:mapping value="CHECKED_OUT" channel="stockChannel" />
	<int:mapping value="COMPLETED" channel="invoiceChannel" />
	<int:mapping value="CANCELED" channel="cancelChannel" />
</int:header-value-router>

The attribute default-output-channel and resolution-required can be used with the same semantics as in the PayloadTypeRouter.

For the special case when the value of the header is textually matches the name of the routing channel, the <mapping> elements are optional. The simplified configuration is exemplified below:

<int:header-value-router input-channel="orderChannel" header-name="status" />

In the simplified configuration where the header value is mapped to a channel name, the default-output-channel is used to send messages only when resolution-required="false".

RecipientListRouter

Another kind of router is the RecipientListRouter that, unlike other types of channel, routes messages to several channels. The XML element <recipient-list-router> can be used to create an instance of RecipientListRouter, and the individual recipient channels are declared with sub-element <recipient>. This is illustrate below:

<int:recipient-list-router id="orderRouter" input-channel="orderChannel">
	<int:recipient channel="requestChannel"/>
	<int:recipient channel="reportChannel"/>
</int:recipient-list-router>

The use of a RecipientListRouter can often by replaced by using a PublishSubscribeChannel to send messages to multiple endpoints. Its behavior is some what dual of the PublishSubscribeChannel — instead of sending messages to multiple MessageHandler endpoints, it send messages to multiple channels.

Using a RecipientListRouter can be made more flexible when routing to specific channels is made conditional on some expression evaluation. Conditions can be set with attribute selector-expression in element <recipient>. The attrubute value is a SPEL expression whose evaluation should return true if routing is to occur.

<int:recipient-list-router id="customRouter" input-channel="orderChannel">
   <int:recipient channel="xmlChannel" selector-expression="orderUtil.isXml(payload)"/>
   <int:recipient channel="csvChannel" selector-expression="orderUtil.isCSV(payload)"/>
   <int:recipient channel="declinedChannel" selector-expression="headers['status']=='DECLINED'"/>
</int:recipient-list-router>

Routing and Error handling

When message delivery is done asynchronouslly by a thread managed by a TaskExecutor, such as in ExecutorChannel or a PublishSubscribeChannel configured with TaskExecutor, exceptions are reported by sending an error message to a error channel (named errorChannel by default). This error reporting messages are of type ErrorMessage whose payload is of type Throwable.

A ExceptionTypeRouter is specialized type of router that relays messages according to the type of the payload exception, and it is used to route error messages generated during asynchronous delivery. It can be defined in XML with element <exception-type-router>, as illustrated below:

<int:exception-type-router input-channel="inputChannel" default-output-channel="defaultChannel">
   <int:mapping exception-type="app.exception.OutOfOrderException" channel="cancelChannel"/>
   <int:mapping exception-type="app.exception.InvalidOrder" channel="invalidChannel"/>
</int:exception-type-router>

Note that the functionallity of the ExceptionTypeRouter is very similar to that of the PayloadTypeRouter. The only difference is the way payload types are matched. For the PayloadTypeRouter matching is done by considering the class hierarchy of the payload type. The ErrorMessage ExceptionTypeRouter first performs matching by following the root cause hierarchy of exceptions (i.e. by recursively calling ((Throwable)payload).getCause() and checking the exception type until a matching rule is found.) Additionally, the use of <exception-type-router>, rather then <payload-type-router>, makes more evident the functional role of the router.

Exercises to the Reader

  • Define a generic router with a POJO criteria that works for messages of payload type String, and selects the output channel by considering the length of the payload.
  • Implement a new type of router named ExpirationRouter, by sub-classing AbstractMessageRouter, that routes messages to three alternative channels depending on the value of the message header EXPIRATION_DATE. Expired messages should be relayed to one channel; messages about to expire (according to a configurable parameter) are routed to a second channel; and messages that have not expired nor are about to, should be sent to a third channel. Configure the new router type in XML, and perform unitary tests.

Message Filtering

Message filters are endpoints that selectively relay messages from an input to an output channel. They can be used to discard messages based on payload or header values, detect invalid messages, or relay non-conforming messages to a separte channel.

Defining Filters

SpringIntegration abstraction for a filter is class MessageFilter. A filter can be conviently defined in XML with element <filter>. The attribute input-channel specifies the channel from where message are read/received, and attribute output-channel specified the channel to where selected (non-filtered out) messages are relayed. Attribute ref specifies the name of the bean implementing the message selection criteria.

XML fragment below illustrates how to define a filter:

<int:filter input-channel="orderInput" ref="orderSelector" output-channel="orderOutput"/>

<beans:bean id="orderFiltering" class="app.OrderSelector"/> 

Similarly to routers and other types endpoints, there are several ways to create a message selection criteria. The more ``low-level'' way, is to for the selector bean implement interface MessageSelector. This is illustrate below:

public class OrderSelector implements MessageSelector {
	@Override
	boolean accept(Message<Order> message) {
		return OrderUtil.isValid(message.getPayload());
	}
}

Method MessageSelector.accept() defines the message selection criteria. If the method return true the message is relayed to the output-channel, otherwise is filtered out. Filtering out, by default, means simply to discard the message.

POJO Message Selectors

Message selector can also be define as POJO, by setting the additional attribute method in the filter definition. This is illustrate below:

<int:filter input-channel="orderInput" ref="orderSelector" method="filter" output-channel="orderOutput"/>

<beans:bean id="orderFiltering" class="app.OrderSelector"/>
public class OrderSelector {
	boolean filter(Order order, @Header("status") String status) {
		return OrderUtil.isValid(order) && !"canceled".equals("status");
	}
}

Because the selector bean is a POJO, the method implementing the selection can have a flexible signature (although it should return a value that can be cast to a boolean). In the example above, the parameter is of type Order assumed to be the type of the paylod. The annotation @Header is also used to get the value of the message header status.

Annotation-Driven Message Selectors

The attribute method can be ommited in the XML configuration of the filter if the annotation @Filter is set in the selector method. The XML attributes input-channel and output-channel can also be specified in the selector class as attributes of the @Filter annotation.

<int:filter ref="orderSelector" />

<beans:bean id="orderFiltering" class="app.OrderSelector"/>
public class OrderSelector {
	@Filter(inputChannel="orderInput", outputChannel="orderOutput")
	boolean filter(Order order) {
		return OrderUtil.isValid(order);
	}
}

Using SPEL Expression as Message Selector

For simple enough message selector criteria, it may be simpler to define them as a SPEL expression rather than a dedicated Java bean. A SPEL expression is setup in a filter with XML attribute expression. The result of the expression evaluation should be a boolean.

<int:filter input-channel="orderInput" output-channel="orderOutput" 
	expression="orderUtil.isValid(payload)" />

Configuration for Filtered Out Messages

By default, filtered out messages (i.e. those for who the selector methods return false) are discarded. Another possibility, is to make the filtering out of messages throw an Exception. This is configured in XML by setting throw-exception-on-rejection="true", as illustrate below:

<int:filter input-channel="orderInput" ref="orderSelector" output-channel="orderOutput" 
	throw-exception-on-rejection="true"/>

Alternatively, filtered out message may be routed to a separated channel. This is configured in XML by setting attribute discard-channel, as illustrated below:

<int:filter input-channel="orderInput" ref="orderSelector" output-channel="orderOutput" 
	discard-channel="invalidOrders"/>

Endpoint Component Automatic Creation

Message endpoints, such as filters and routers, can also be automatically created by detection of type-level annotation @MessageEndpoint.

@MessageEndpoint
public class OrderSelector {
	@Filter(inputChannel="orderInput", outputChannel="orderOutput")
	boolean filter(Order order) {
		return OrderUtil.isValid(order);
	}
}

Exercises to the Reader

  • Create a MessageSelector implementation class that discard messages that have expired (i.e. the EXPIRATION_DATE header is before the current time). Configure a filter in XML to use that selector, and unit test it.

Message Transformation

Message transformers are endpoints that transform messages payload and/or headers. The can be used to perform arbitrary transformations on the payload, convert payload types, and add, change, or remove headers. A very commonly used design-pattern is the Normalizer — where several transformers to convert data from multiple formats into a single unified data model.

Defining Message Transformers

The Spring Integration abstraction to transform message is the interface Transformer. The abstract base class AbstractMessageProcessingTransformer implements this interface, and expects a MessageProcessor as constructor element. The Xml element transformer can be used to define a transformer, as illustrate below:

<int:transformer id="orderInvoiceTransformer" input-channel="orderChannel" 
	 ref="orderTransform"  output-channel="processedChannel"/>
            
<beans:bean id="orderInvoiceCreator" class="app.OrderInvoiceCreator" />

The attribute inputChannel specifies the channel where message are received from and attribute outputChannel the channel where transformed messages are sent to. The attribute ref specifies a bean implementing interface MessageProcessor, such as the one shown below:

public class OrderInvoiceCreator implements MessageProcessor<String> { 

	@Override	
	public String processMessage(Message<Order> message) {
		return createInvoice(message.getPayload());
	}
}

Because in the example the method processMessage() is not returning a Message, the AbstractMessageProcessingTransformer consider it to be the payload of a message that is automatically create. (The headers are copied for this new message are copied from the original message.)

POJO Transformers

A more convenient way to define transformer is to use a message transformation method that is POJO (rather than having to implement MessageProcessor). This can be configure by additionally setting attribute method to the name of the method to be invoke reflectively. This is illustrate below:

<int:transformer id="orderInvoiceTransformer" input-channel="orderChannel" 
	 ref="orderTransform" method="createInvoice" output-channel="processedChannel"/>
            
<beans:bean id="orderInvoiceCreator" class="app.OrderInvoiceCreator" />
public class OrderInvoiceCreator { 

	@Override	
	public String createInvoice(Order order) {
		return createOrderInvoice(order);
	}
}

Note, that when transformation class has a single method the attribute method is optional.

Transformer Configuration with Annotations

The method used for transforming a message can also be made explicit with annotation @Transformer.

public class InvoiceCreator {
	@Transformer
	public Order transform(Order order) {
		order.addItem(new GiftItem());
	}
}

Transformers with SPEL Expressions

Message Transformation II: Common Transformers

In addition to provide mechanisms to define generic transformers, Spring Integration also provides ``out-of-the-box transformers for specific functions. Using this transformer, whenever possible, simplifies development because it reduces the need to dedicated Java classes for message processing. It also simplifies the XML configuration because it may not be necessary to define a bean of the implemented class, and reference it in attribute ref of <transformer>.

Object Formatting

The class ObjectToStringTransformer is a transformer that maps arbitrary Java objects to a text representation of it, by simply calling the method toString(). It can be declared in XML with element <object-to-string-transformer>, as illustrated below:

<int:object-to-string-transformer input-channel="in" output-channel="out"/>

This transformer is useful when is necessary to feed endpoint that expect payloads of type String. For example, file output adapters support a limited set of types as payload which includes String. Using a ObjectToStringTransformer allow the message to be prepared for input to this adaptor.

Object Serialization and Deserialization

Another very commonly needed sort of transformation is to map the state of Java objects and object graphs to serialized binary representations. And, converselly, to map serialized binary representations back to objects with the original state and graph structure. Spring core framework, defines the interfaces Serializer and Deserializer to model in an abstract way algorithms to perform serialization and deserialization, respectively.

In Spring Integration, transformers can be defined to perform serialization and deserialization of message’s payload. This is configured with elements <payload-serializing-transformer> and <payload-deserializing-transformer>, as illustrated below:

<int:payload-serializing-transformer input-channel="serializableInput" output-channel="bytesChannel"/>

<int:payload-deserializing-transformer input-channel="bytesChannel" output-channel="outputChannel"/>

By default, the classes DefaultSerializer and DefaultDeserializer are used as serialization and deserialization algorithms. Which are really just wrapper to the Java serialization algorithms base on classes JavaOutputChannel and JavaInputChannel, respectively. (In fact, these are the only implementations available ``out-of-the-box'' by Spring framework.) Alternatively implementation can be specified with XML attributes serializer and deserializer.

Object Serialization and Deserialization

<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>
<int:map-to-object-transformer input-channel="input" output-channel="output" type="org.foo.Person"/>

JSON Serialization and Deserialization

<int:object-to-json-transformer input-channel="objectMapperInput"/>

<int:json-to-object-transformer input-channel="objectMapperInput" type="foo.MyDomainObject"/>
<int:json-to-object-transformer input-channel="objectMapperInput" 
	type="foo.MyDomainObject" object-mapper="customObjectMapper"/>
public class ObjectMapperFactory {
   
   public static ObjectMapper getMapper() {
       ObjectMapper mapper = new ObjectMapper();
       mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
       return mapper;
   
}

Message Transformation III: Content Transformer

Transformation for/based on Headers

Transformation for/based on Content

Content Enricher

Claim Check

Incoming Claim Check Transformer

Outgoing Claim Check Transformer

JSON Transformers

Message Splitting

A message splitter is an endpoint that produces multiples output messages out of a single input message. It is usually used to decompose messages with composite or structure objects payloads, into messages for the different components of the input payload. For example, an Order may be split into individual OrderItems. This can be used to process the different component elements separately, possible for latter (re)aggregation.

Message Splitter

In Spring Integration a message splitter is modelled with abstract class AbstractMessageSplitter, The abstract method splitMessage() should be overriden to implement a concrete splitting strategy. This method takes the input message as parameter, and should return a collection or array of Message or payload objects.

The AbstractMessageSplitter class produces as output multiple messages. These are the messages directly returned by method splitMessage(), or internally created messages having as paylod the object returned by the method.

Each of the messages produced by AbstractMessageSplitter has three headers automatically set:

  • CORRELATION_ID — is set with the value of the header ID of the input message
  • SEQUENCE_SIZE — is set with the number of output messages produced from the input message
  • SEQUENCE_NUMBER — is set with a ordering number from 1 to SEQUENCE_SIZE

These headers are set so that it possible to track the origin of the message. For example, to reconstruct the original messages but with the component elements transformed by some intermediate endpoints.

Example below show how to configure a splitter in XML with element <splitter>. Attribute ref is a reference to bean that extends AbstractMessageSplitter.

<int:splitter id="orderSplitter" input-channel="orderChannel" 
	ref="orderSplitterBean" output-channel="itemChannel" />

<beans:bean id="orderSplitterBean" class="app.OrderSplitter"/>

Below is the implementation of the splitter bean:

public class OrderSplitter extend AbstractMessageSplitter {

	@Override
	protected Object splitMessage(Message<Order> message) {
		return message.getPayload().getItems();
	}
}

POJO Splitter

The message splitting bean can also be defined as POJO, by setting the attribute method in element <splitter>. The method can have a flexible signature, similarly to what can be done with POJO transformers and filter classes. An example, is shown below:

<int:splitter id="orderSplitter" input-channel="orderChannel" 
	ref="orderSplitterBean" method="split" output-channel="itemChannel" />

<beans:bean id="orderSplitterBean" class="app.OrderSplitter"/>

The method of the POJO splitting bean can take as input a Message or a payload object, and return a collection or array of Message or payload objects — just like the the plain AbstractMessageSplitter.splitMessage() method.

public class OrderSplitter {

	public OrderItem[] split(Order order) {
		return order.getItems();
	}
}

Annotation-Driven Splitter Definition

When the splitting bean class has a single method, the attribute ref in element <splitter> is optional. If the class as multiple method, the splitting method can be disambiguated with annotation @Splitter. This is illustrated below:

public class OrderSplitter {

	//getter and setters
	...
	
	@Splitter
	public OrderItem[] split(Order order) {
		return order.getItems();
	}
	
}

Exercises to the Reader

  • Implement a splitter — LineBreakSplitter, that expects message payload of type String, and produces as output messages for each of the \n separated text lines of the input payload. Configure the splitter in XML and write a test class for unit testing the splitter.
  • Implement a splitter — LinePatternSplitter, that expects message payload of type String, and its configured with a list of regular expression patterns. It produces as output the same number of messages as the cardinality of the pattern list, and each message contains as payload all the text lines that match the corresponding pattern. Configure the splitter in XML andwWrite a test class for unit testing the splitter.
  • Implement a router — PatternRouter, that expects message with text payload and is configured with a Map<String,String> mapping regular expression patterns to output channel names. Unit test the router. Then, configure in XML a network with a LinePatternSplitter and a PatternRouter. Connect a different transformer to each out the output channel of the router, so that each text line process in a different way. Write a test class for testing the correct behavior of the complete network.

Message Aggregation

A message aggregator is an endpoint that combines multiple input messages into a single output message. A correlation strategy is used to decided which input messages are to be aggregated with which other messages (belong to the same message group or ``bucket''). A release strategy is used to determine when the message group is complete and a single message should be produced out of all the messages in the group. Finally, a message group processor is invoked to assemble the group into a single message.

A message aggregator is usually used to ``undo'' the work of splitter, or in some cases the work of a PublishSubscribeChannel or RecipientListRouter. A typical pattern is for a splitter break a message into multiple messages, each having as payload a component of the original payload, a content-based router then sends the messages to different transformers, which in turn send all the processed messages to a single channel. A message aggregator is then connected to the transformer output channel, to reconstruct a multiply transformed version of the original message.

Aggregator API

In Spring Integration, an message aggregator is abstracted with class AggregatingMessageHandler, which is a sub-class of AbstractCorrelatingMessageHandler (shared as parent class with other endpoints that also require collecting message groups, such as the resequencer). The behavior of these classes is to keep messages stored in message groups, such that all messages in the same group have the same correlation key, and release an aggregated message for each group that is considered complete for release. Three strategy interfaces are used to configure an aggregator:

  • A MessageGroup processor interface implementation is used to create a single message out of a complete group of correlated messages.
  • A CorrelationStrategy interface implementation is used to determine which message belong to the same MessageGroup based on a correlation key.
  • A ReleaseStrategy implementation is used to decided when group of correlated messages is complete.

A MessageGroup is used to abstract a group of message, and are are managed internally by a MessageGroupStore.

The specification for interface MessageGroupProcessor is shown below:

public interface MessageGroupProcessor {
	Object processMessageGroup(MessageGroup group);
}

The method MessageGroupProcessor.processMessageGroup() takes a input a MessageGroup containing a complete of correlate messages. An implementation should return as output the payload of the aggregated message or the message itself. (As discussed elsewhere, when used in a resequencer endpoint a collection of messages may also be returned.)

The default implementation is DefaultAggregatingMessageGroupProcessor, that creates an output message having as payload a collection with the group’s messages payload as elements. This default implementation is often overridden with more customized behavior.

XML Configuration of an Aggregator

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" output-channel="orderChannel" />

<int:channel id="itemChannel" />

<int:channel id="orderChannel" />

<bean id="aggregatorBean" class="app.OrderAggregator" />

POJO Aggregator

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" method="aggregate" output-channel="orderChannel" />

<bean id="aggregatorBean" class="app.OrderAggregator" />

Annotation-Driven Aggregator Configuration

CorrelationStrategy

The CorrelationStrategy interface is used to determine which messages belong to the same MessageGroup. The specification of this interface is shown below:

public interface CorrelationStrategy {
	Object getCorrelationKey(Message<?> message);
}

The method CorrelationStrategy.getCorrelationKey() should return a correlation key object (of any type). Messages that have the same correlation key are considered to belong to the same MessageGroup.

The default implementation is HeaderAttributeCorrelationStrategy, that returns as correlation key the value of the message header CORRELATION_ID. (If the header is not present, and exception is thrown.)

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" output-channel="orderChannel"
		correlation-strategy="correlationStrategyBean"
		correlation-strategy-method="correlate" />

<bean id="correlationStrategyBean" class="app.ItemCorrelationStrategy" />

ReleaseStrategy

The ReleaseStrategy interface is used to decided when a group of correlate messages is complete. The specification of this interface is shown below:

public interface ReleaseStrategy {
	boolean canRelease(MessageGroup group);
}

The method ReleaseStrategy.canRelease() should return true when the MessageGroup passed as parameter is considered completed. The kind of information that can be used to decided to release the MessageGroup includes the size/membership of the group, the headers and/or payload of the component messages.

The default implementation is the class SequenceSizeReleaseStrategy, that release a group if the number of messages in the group is the same as the value of the header for SEQUENCE_SIZE of one of the messages in the group.

Note: The exact message in the group that is looked at for the SEQUENCE_SIZE header is not defined, and depends on the implementation of the MessageGroup. Therefore, it is a ``best practice'' to ensure that all the messages in the group have the same value for the SEQUENCE_SIZE header. This is the expected behavior when the SEQUENCE_SIZE header is set by a upstream splitte, and the default correlation strategy is used. For the default implementation SimpleMessageGroup, the first message in the group is looked at.

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" output-channel="orderChannel"
		release-strategy="releaseStrategyBean"
		release-strategy-method="release" />

<bean id="releaseStrategyBean" class="app.OrderReleaseStrategy" />

Other Configuration Options

Persistent Message Store

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" output-channel="orderChannel"
		message-store="jdbcMessageStore" />

<bean id="jdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore">
	<constructor-arg ref="dataSource" />
</bean>

Discarding Unreleased Messages

<int:aggregator id="orderAggregator" input-channel="itemChannel" 
		ref="aggregatorBean" output-channel="orderChannel"
		discard-channel="throwAwayChannel" order="1"
		send-partial-result-on-expiry="false" send-timeout="1000" />

<int:channel id="throwAwayChannel" />

Exercises to the Reader

  • Implement an AggregatorLengthAggregator, that expects messages with payload of type String, and produces an output message for each of the set of N messages with the same payload length.

Message Handler Chain

XML Payloads

Spring Integration has components specialized in dealing with messages with XML payloads. This components make use of standard technologies specialized in the processing of XML documents, such as XPATH expressions and XSLT transforms. The components have the same kind of functions as the core components – such as filters, transformer, splitters, and router – but are easier to configure, since no Java coding is required to process messages or implement configuration strategies.

XML Namespace

The Spring Integration components for XML processing can be most easily defined in XML by importing the a dedicated namespace http://www.springframework.org/schema/integration/xml. This namespace is used along side with other namespaces, such as the beans namespace, the core namespace for Spring Integration, and possibly other namespaces. This is illustrate below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration
		http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/integration/xml
		http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd">
    ...
</beans>

XPATH Expressions

XPATH is a W3C standard expression language to select sub-tree in a XML document. A XPATH processor takes as input a XML document, and produces as output a list of nodes each of which is the root of a sub-tree in the input XML documents that matches the selector or criteria defined by the XPATH expression.

As an example, consider the following XML data file describing a costumer order:

<order costumer="albert@emc2.org" >
	<item type="book" code="book1234" price="100.0" quantity="1" />
	<item type="cd" code="cd5678" price="30.5" quantity="2" />
	<item type="tablet" code="tablet7" price="250.0"quantity="1" />
</order>

We might want to process each of the items in the order with separated Spring Integration components. The XPATH expression /order/item could be used to do just that. The output of the XPATH interpreter would be a list of XML nodes one for each <item> element inside the <order> element. If the <item> has sub-elements and/or content text they would also be part of the XML node tree having the item as root. (To learn more about XPATH see the resources section.)

XML definition of XPATH expressions

In Spring Integration, XPATH expressions can be conviniently defined in XML by using the element <xpath-expression> from the XML processing namespace. This is illustrated below:

<int-xml:xpath-expression expression="/order/item"/>

The attribute expression specifies the value of the XPATH expression.

XML Splitters

Splitters are components that ``break'' individual input messages into multiple output messages. Since the output of a XPATH processor is a list of nodes, XPATH naturally fits the role of implementing the splitting strategies of splitters.

In Spring Integration the splitter specialized in XML payloads is implemented by class XPathMessageSplitter. It can be most convinientely created using the element <xpath-splitter>, as shown below:

<int-xml:xpath-splitter id="orderSplitter"
		input-channel="ordersChannel" output-channel="itemsChannel">
    <int-xml:xpath-expression expression="/order/item"/>
</int-xml:xpath-splitter>

Notice that the XPATH expression used by the spliiter is defined with element <xpath-expression> as a sub-element of <xpath-splitter>.

Since the output of the XPATH splitter is a list of nodes, the output message of the splitter will by default have each of these nodes as payload. The payload type of the output messages can also by transformed to a XML document by setting the attribute create-documents="true".

XML Routers

XPATH expression can also be used in the implementation of routers. In this case, the output of the output from the evaluation of the XPATH expression is assumed to be a list of text nodes. Each text node is assumed to represent the name of a channel to which a message should be routed.

A XPATH based router can be defined with the element <xpath-router>, as illustrated below:

<int-xml:xpath-router id="itemRouter" input-channel="itemChannel">
    <int-xml:xpath-expression expression="/item/@type"/>
</int-xml:xpath-router>

<int:channel id="cd" />
<int:channel id="book" />

This <xpath-router> will route item based on the value of attribute type of <item> elements. The value of the attribute is interpreted as the name of the channel to which the message should be routed. If the text node list has more than one element, then the message will be routed to all the channels named in the list.

If the value of the text node list does not represent direcly the name of a channel, it can be mapped to a channel names using one or more <mapping> elements. This is illustrated below:

<int-xml:xpath-router id="itemRouter" input-channel="itemChannel">
    <int-xml:xpath-expression expression="/item/@type" />
    <int-xml:mapping value="cd" channel="cdChannel" />
    <int-xml:mapping value="book" channel="bookChannel" />
</int-xml:xpath-router>

<int:channel id="cdChannel" />
<int:channel id="bookChannel" />

XSLT Transformers

Another widelly use XML processing technology is XSLT (EXtensible Stylesheet Language). XSLT XML domain-specific language used to defines transformations on XML input files to produces output files (often also XML). A XSLT script consist of a set of transformations, each of which defines a rule that matches a sub-set of XML nodes to produce some output. (To learn more about XSLT see the resources section.)

In Spring Integration, the class XsltPayloadTransformer models a transformer for XML payloads using XSLT transform. It can be defined in XML using element <xslt-transformer>. The XSLT script can be specified using a resource file and set with attribute xsl-resource.

<int-xml:xslt-transformer id="itemTransformer"
    input-channel="items" output-channel="processedItems"
    xsl-resource="file://transform-items.xsl" />

Alternatively, the XSLT transformation can be defined as a bean of type Transformer, and set in attribute xsl-templates:

<int-xml:xslt-transformer id="itemTransformer"
    input-channel="items" output-channel="processedItems"
    xsl-templates="transformI" />

Exercises to the Reader

  • Define a splitter for messages with XML payloads describing a costumer Order (class XmlOrderSplitter). The splitter should create outputs messages with the <item> elements as root. Use an appropriate XPATH expression for this. Write a unit test to check that splitter works.
  • Write a item router for XML payloads (class XmlItemPriorityRouter), such that messages are routed based on the priority attribute of element <item>.
  • Create XSLT script that discards all <order> elements with attribute status="cancelled". (Tip: use the identity transform as the fallback rule.). Define a Spring Integration transformer that uses the XSLT script to perform the transformation (Class OrderTransformer). Write a unit test to check that the transformer worker.

Solutions for the Exercises

Solutions for the exercises are available from the solutions JAR bundle — integration-solution.jar, in package org.jpalace.spring.integration.xml.

  • Class XmlOrderSplitter
  • Class XmlItemPriorityRouter
  • Class OrderTransformer

References and Resources

Web Resources

Comments and Discussion

Content