Large binary support

Large binary support

Tutorial

A tutorial is available for the Large Binary Support: Routing to a Webservice via HTTP

Concept

The idea of the LBS is to handle large binaries that are part of a Camel message without keeping the complete binary in memory. The LBS provides functionality for the following Camel endpoints:

  • HTTP
  • CXF
  • MINA (MLLP, HL7)

There are several ways to handle the transfer of binary data:

  • Implicit transfer: Use Camel exchanges/messages to transfer the binaries
  • Download by receiver: Tell the receiver where the binary is stored and allow the download of the binary via a file transfer protocol
  • Upload by sender: Upload the binary using a file transfer protocol and tell the receiver about the upload location

The choosen alternative depends on the components within the system. For example, within a system where a client has to transfer a file to the server it might be impossible to use the second option if the client is behind a firewall. Often binaries are contained in the messages themselves for historical reasons. While it is obvious that the implicit transfer has disadvantages when dealing with larger files, it is not always possible to avoid this option. Protocols might support such transfer for historical or simplicity reasons. HL7, for example, can transfer documents within a normal HL7 message. Because of this, the IPF offers large binary support (LBS) for such scenarios and simplifies the handling.

The following diagram shows the LBS when up- and downloading large binaries:

A binary is received as part of a message send to an endpoint. Before the message is processed, the LBS stores the binary on disk and replaces the original message to allow access to the stored file. When sending the binary to another outgoing endpoint, the LBS fetches the binary from disk and packs it into a message format that is compatible with the endpoint.

DSL extensions

The LBS offers DSL extensions to provide the extraction and integration of large binary content:

  • store() to extract content
  • fetch() to integrate content

The following example shows the extensions:

from('jetty:http://localhost:8080/router')
    .store().with('resourceHandlers')
    ...
    .fetch().with('resourceHandlers')
    .to('http://localhost:8080/receiver')

Note that the store() and fetch() processors are both configured via with() to define the resource handlers. These define the set of strategies that are used to find content within the message and are usually protocol specific. There are predefined handlers (for HTTP and CXF), but it is also possible to implement custom handlers (via the interface org.openehealth.ipf.platform.camel.lbs.core.process.ResourceHandler). with() requires the name of a bean within the Spring application context. This bean is a List of ResourceHandlers, such as:

    <!-- This bean is a list of resource handlers. Add all handlers used within the routes to this list -->
    <util:list id="resourceHandlers">
        <bean class="org.openehealth.ipf.platform.camel.lbs.cxf.process.CxfPojoResourceHandler">
           <constructor-arg ref="resourceFactory"/>
        </bean>
        <bean class="org.openehealth.ipf.platform.camel.lbs.http.process.HttpResourceHandler">
            <constructor-arg ref="resourceFactory" />
        </bean>
    </util:list>

Using the LBS

This section is meant to quickly set up and use the LBS depending on the actual use cases. In short the process is:

Configure a project to use the LBS

For projects that use Maven 2, the dependencies of the required endpoints are added to the pom.xml file. The other jars will be added automatically via the Maven dependency management. E.g. to use all three endpoints the following dependencies need to be added:

pom.xml
...
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-http</artifactId>
            <version>${ipf-version}</version>
        </dependency>
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-mina</artifactId>
            <version>${ipf-version}</version>
        </dependency>
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-cxf</artifactId>
            <version>${ipf-version}</version>
        </dependency>

The LBS adds new processors to the route builder. To do so, it must tie itself into the IPF route extender mechanism. Usually the route model extender is part of the Spring application context (e.g. in context.xml). The following is a typical set of beans needed by an LBS enabled route:

context.xml
...
    <!-- This is the standard model extension from the IPF (not LBS related) -->
    <bean id="coreModelExtension" class="org.openehealth.ipf.platform.camel.core.extend.CoreModelExtension" />

    <!-- Add this bean to provide the LBS specific processors (store and fetch) to your routes -->
    <bean id="lbsModelExtension" class="org.openehealth.ipf.platform.camel.lbs.core.extend.LbsModelExtension" />

    <!-- This is the standard route model extender, it picks up all the extensions to allow the IPF to use them -->
    <bean id="routeModelExtender" class="org.openehealth.ipf.platform.camel.core.extend.DefaultModelExtender">
        <property name="routeModelExtensions">
            <list>
                <!-- Reference to the core extension from the IPF (not LBS related) -->
                <ref bean="coreModelExtension" />
                <!-- You might have additional extensions here, for flow management or any custom extension -->

                <!-- Reference to the LBS extension bean to tell the IPF about it -->
                <ref bean="lbsModelExtension" />
            </list>
        </property>
    </bean>

Set up a disk store

The easiest way to set up a disk store is to create a bean in the Spring application context.

context.xml
<!-- Stores binaries in the location on disk defined by the constructor argument -->
<bean id="diskStore" class="org.openehealth.ipf.commons.lbs.store.DiskStore">
    <constructor-arg value="location/on/disk"/>
</bean>

<!-- Creates a DataSource for a stored binary -->
<bean id="resourceFactory" class="org.openehealth.ipf.commons.lbs.resource.ResourceFactory">
    <constructor-arg ref="diskStore"/>
    <constructor-arg value="unnamed"/>
</bean>

This creates a DiskStore bean that uses the given location as the base directory to store all binaries. The resource factory creates standard data sources (javax.activation.DataSource) for stored binaries. The data sources allow a simplified access to the stored binary within routes via message.getBody(InputStream.class).

Adding support for the HTTP endpoint

A Maven dependency has to be added to the pom.xml file to use the LBS with the HTTP or Jetty endpoint.

pom.xml
...
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-http</artifactId>
            <version>${ipf-version}</version>
        </dependency>

Routes use the HTTP endpoint functionality by accessing a list of handlers. The handlers "know" how to decode and encode messages of the corresponding endpoint. The HttpResourceHandler is used for the HTTP/Jetty support. It is best to create a list of the handlers in the application context:

context.xml
    <!-- This bean is a list of resource handlers. Add all handlers used within the routes to this list -->
    <util:list id="resourceHandlers">
        <bean class="org.openehealth.ipf.platform.camel.lbs.http.process.HttpResourceHandler">
            <constructor-arg ref="resourceFactory" />
        </bean>
    </util:list>

It is recommeded to add additional handlers to this list if the LBS is used with multiple endpoints. The Spring util namespace must be defined within the beans tag within the same file:

context.xml
<beans ... other namespaces ...
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
... other locations
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.5.xsd
...">
Jetty Component and Camel 2.3.0

Due to changes in Camel 2.3.0 it is necessary to use a modified version of the Jetty component. This component ensures that the LBS has access to the pure InputStream received by Jetty. To use this version of the component you have to define it as a bean in your application context, e.g.:

    <bean id="jetty" class="org.openehealth.ipf.platform.camel.lbs.http.LbsJettyHttpComponent" />

Storing singlepart uploads

The store processor is used to store a binary within a route that receives messages from a Jetty endpoint:

from('jetty:http://localhost:8080/test')          // Standard Jetty endpoint
    .store().with('resourceHandlers')             // Stores content in the disk store via the handlers
    ...

This will store the HTTP input message contents via the resourceHandlers. It is useful to process HTTP POST or PUT methods. The result is a message that contains a org.openehealth.ipf.platform.camel.lbs.http.process.ResourceList in its input body. To process the binary it is recommended to ask the message for an InputStream and use standard stream handling. Although it is possible to ask for a String it is important to note that this will always read the whole binary into memory, making the LBS useless. However, it can be an useful when testing routes.

Here is an example of a custom processor that scans an uploaded text file for a token and sets a header field if it found the token:

from('jetty:http://localhost:8080/lbstestpost')
    // Replace the message content with a data source
    .store().with('resourceHandlers') 
    // Custom processing to find a token
    .process { Exchange exchange ->
        // Get the stream from the data source and read it
        def inputStream = exchange.in.getBody(InputStream.class)                
        def reader = new BufferedReader(new InputStreamReader(inputStream))
        try {
            def line = reader.readLine()
            // Look for the token
            while (line != null && !line.contains('blu')) {
                line = reader.readLine()
            }
            // If found set the header
            if (line != null) {
                exchange.in.setHeader('tokenfound', 'yes')
            }
        }
        finally {
            reader.close()
        }
    }

Storing multipart uploads

Storing multipart uploads is very similar to storing singlepart uploads. The main difference is that the resource list contains multiple stored binaries. Multipart uploads are supported for HTTP POST methods only. The following is an example of a custom processor that checks if one of the uploaded binaries is a text file and sets a header entry if it found one:

from('jetty:http://localhost:8080/lbstest_example2')
    // Replace the message content with data sources
    .store().with('resourceHandlers')
    // Custom processing to look for text resources
    .process { Exchange exchange ->
        // Run through all resources and check the content type
        exchange.in.getBody(ResourceList.class).each {
            if (it.contentType.startsWith('text/plain')) {
                exchange.in.setHeader('textfound', 'yes')
            }
        }
    }

Note that the resource list is also present in the singlepart case. Routes that do not know how many parts are uploaded in advance can assume that the resource list in the message body is always set for HTTP messages.

Storing downloads

Similar to the upload, downloads can be stored via the route:

...
.to('http://localhost:8080/lbstestget')     // Standard way to send an HTTP message
.store().with('resourceHandlers')           // Store the response of the HTTP message
...

This is useful when sending an HTTP GET request to retrieve binary data. Instead of directly processing the data, it is first stored and then processed further in the route. A resource list with DataSources is accessible in the same way as in the upload case.

Uploading stored binaries

By using the fetch processor a singlepart or multipart HTTP POST request is created:

...
.fetch().with('resourceHandlers')           // Put a binary from the store into an HttpMessage
.to('http://localhost:8080/receiver')       // and send it

This will generate a POST request containing the binary and send it via the HTTP endpoint. The binary can be added manually to the store before fetching it:

...
.process { Exchange exchange ->
    // The resource factory can be used to create resources manually
    def resourceFactory = bean(ResourceFactory.class, 'resourceFactory') 
    def inputStream = new ByteArrayInputStream('hello world'.bytes)
    
    // Using the unit of work from the original exchange we can ensure that the
    // resource is removed once the message has been processed by the route
    def resource = resourceFactory.createResource(exchange.unitOfWork.id, 'text/xml', null, 'hello', inputStream)
    def resourceList = exchange.in.getBody(ResourceList.class)
    resourceList.add(resource)
}
// Create a POST request with the resources
.fetch().with('resourceHandlers')
.to('http://localhost:8080/receiver')

Of course, the results of the store processor can be fed directly into the fetch processor without further processing, allowing simple routing that does not care about the resources:

from('jetty:http://localhost:8080/router')
    .store().with('resourceHandlers')
    .setHeader('tag').constant('I was here')
    .fetch().with('resourceHandlers')
    .to('http://localhost:8080/receiver')

Adding support for the CXF endpoint

The CXF endpoint support is added to a project in the pom.xml:

pom.xml
...
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-cxf</artifactId>
            <version>${ipf-version}</version>
        </dependency>

In addition a CXF endpoint is configured inside of a Spring application context. It is necessary to declare the cxf namespace and import additional resources provided by Camel. The following is an example:

context.xml
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:lang="http://www.springframework.org/schema/lang" 
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:cxf="http://camel.apache.org/schema/cxf"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
http://camel.apache.org/schema/cxf 
http://camel.apache.org/schema/cxf/camel-cxf.xsd
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/lang 
http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util-2.5.xsd
http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd 
">

    <!-- Imports used for the CXF endpoint -->
    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-http-jetty.xml" />

    <!-- Definition of a CXF endpoint via a service using a WSDL. -->
    <cxf:cxfEndpoint id="soapEndpointNoExtract"
        serviceClass="org.openehealth.ipf.platform.camel.lbs.cxf.process.Greeter"
        address="http://localhost:9002/SoapContext/NoExtractPort" endpointName="s:SoapOverHttp"
        serviceName="s:SOAPService" wsdlURL="hello_world.wsdl"
        xmlns:s="http://cxf.process.lbs.camel.platform.ipf.openehealth.org/">

        <!-- Enabling MTOM is recommended with large binaries -->
        <cxf:properties>
            <entry key="mtom-enabled" value="true" />
        </cxf:properties>
    </cxf:cxfEndpoint>

    ...
</beans>

Note that this configuration highly depends on the service and is pure Camel configuration. See http://camel.apache.org/cxf.html for more details on the CXF component in Camel.

In this example, CXF will use MTOM. SwA can also be used with the LBS by simply removing the mtom-enabled property. However, it is recommeded to use MTOM with large binaries because the transfer is more efficient.

Routes use the CXF endpoint functionality of the LBS by accessing a list of handlers. The handlers "know" how to decode and encode messages of the corresponding endpoint. The CxfPojoResourceHandler is used for the CXF support. It is best to create a list of the handlers in the application context:

context.xml
    <!-- This bean is a list of resource handlers. Add all handlers used within the routes to this list -->
    <util:list id="resourceHandlers">
        <bean class="org.openehealth.ipf.platform.camel.lbs.cxf.process.CxfPojoResourceHandler">
           <constructor-arg ref="resourceFactory"/>
        </bean>
    </util:list>

It is recommeded to add additional handlers to this list if the LBS is used with multiple endpoints. Note that the util namespace must be defined in the beans tag.

Storing binaries from a SOAP request

Storing the binaries is done via the store processor:

from('cxf:bean:imageBinServer')                  // Standard POJO CXF endpoint usage
    .store().with('resourceHandlers')            // Stores any binaries into the disk store
    ...

For usage with the LBS it is important that the POJO data format is used. This is the default for CXF endpoints. In POJO data format the CXF endpoint puts a MessageContentsList in the input body. This is a flat list of the parameters passed to the operation. The CxfPojoResourceHandler interprets all parameters of type DataHandler and Holder<DataHandler> as binaries and stores their content into the attached store. The original DataHandler is then replaced with a version that points to the store. For later processing this change is completely transparent.

Here is an example of a custom processor handling a SOAP request:

from('cxf:bean:soapEndpointExample1')
    // Store the binaries of the operation paremeters
    .store().with('resourceHandlers')
    // Custom processing to find a token in a binary
    .process { Exchange exchange ->
        // Operation parameters are contained in a list
        def params = exchange.in.getBody(List.class)
        // In this example, the parameter with index 1 contains a binary
        def inputStream = params.get(1).value.inputStream
        def reader = new BufferedReader(new InputStreamReader(inputStream))
        try {
            def line = reader.readLine()
            // Look for the token
            while (line != null && !line.contains('blu')) {
                line = reader.readLine()
            }
            // If found set the header
            if (line != null) {
                exchange.in.setHeader('tokenfound', 'yes')
            }
        }
        finally {
            reader.close()
        }
    }

The SOAP message contains a Holder<DataHandler> in the parameter with index 1. The binary contained in this parameter is scanned for a token and a header is set if the token was found.

Storing binaries from a SOAP response

Just like using the store processor to handle a request, it can also be used to store binaries contained in a response. The store processor is simply called after the request was send to CXF endpoint:

...
.to('cxf:bean:soapEndpoint')
.store().with('resourceHandlers')

Processing the response is identical to processing the request. The only thing to keep in mind is that the CXF endpoint places response parameters in a MessageContentsList that contains the return value of the operation as the first element. If the method returns void the first element in the list is null.

Preparing a SOAP request with stored binaries

For CXF it is not necessary to use the fetch processor because the CXF message is always kept in a format that can be understood by the CXF endpoint. Therefore, the only thing to decide is where the binaries come from. Everything else is identical to normal CXF message generation.

The following example performs a SOAP request by using two parameters that access stored binaries:

...
// Custom processor to manually create a SOAP call
.process { Exchange exchange ->
    // The resource factory can be used to create resources manually
    def resourceFactory = bean(ResourceFactory.class, 'resourceFactory') 
    def inputStream1 = new ByteArrayInputStream('hello world'.bytes)

    // Using the unit of work from the original exchange we can ensure that the
    // resource is removed once the message has been processed by the route
    def resource1 = resourceFactory.createResource(exchange.unitOfWork.id, 'text/plain', null, null, inputStream1)
    def inputStream2 = new ByteArrayInputStream('this is me'.bytes)
    def resource2 = resourceFactory.createResource(exchange.unitOfWork.id, 'text/plain', null, null, inputStream2)
    
    // The list of parameters for the operation call
    def params = new MessageContentsList()                
    params.set(0, new Holder<String>('Hello world'))
    params.set(1, new Holder<DataHandler>(new DataHandler(resource1)))
    params.set(2, new DataHandler(resource2))
    
    // postMe is the operation being called
    exchange.in.setHeader(CxfConstants.OPERATION_NAME, "postMe")
    exchange.in.body = params
}
.to('cxf:bean:soapEndpoint')

Any binaries contained in the result of the call can be stored as described in the previous section.

Adding support for the MINA endpoint

The MINA endpoint support is added to a project in the pom.xml:

pom.xml
...
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-lbs-mina</artifactId>
            <version>${ipf-version}</version>
        </dependency>

The endpoint is created implicitly by using it in a route:

from('mina:tcp://localhost:6123?sync=true')
...

The endpoint uses the query syntax to define various options. One of these is the definition of a codec that encodes and decodes data streams based on a protocol specification. In the case of MLLP and HL7, Camel offers a codec (HL7MLLPCodec). Unfortunately this codec does not efficiently handle large binaries. Therefore, the LBS offers its own codec implementation via MllpStoreCodec. This codec does not directly support HL7, it only works with MLLP. The individual messages need to be processed by a marshaller to get access to the HL7 data model. As with most components, it is best to define the codec inside the Spring application context:

context.xml
<!-- This codec is used with a MINA endpoint to efficiently process large binaries wrapped by MLLP -->
<bean id="mllpStoreCodec" class="org.openehealth.ipf.platform.camel.lbs.mina.mllp.MllpStoreCodec">
    <constructor-arg ref="resourceFactory" />
    <constructor-arg ref="camelContext" />
</bean>

The codec is then used inside the route definition:

from('mina:tcp://localhost:6125?sync=true&codec=mllpStoreCodec')
...

All MLLP messages arriving at the endpoint are now stored in the disk store and the message contains a data source to the store location. Note that this is done without using the store or fetch processors.

Processing messages from the MINA endpoint with the LBS

Once the endpoint has been configured, the actual message can be read from a data source in the input message:

from('mina:tcp://localhost:6125?sync=true&codec=mllpStoreCodec')
    // Custom processing to find the token
    .process { Exchange exchange ->
        // Get the stream from the data source and read it
        def inputStream = exchange.in.getBody(InputStream.class)
        def reader = new BufferedReader(new InputStreamReader(inputStream))
        try {
            def line = reader.readLine()
            // Look for the token
            while (line != null && !line.contains('blu')) {
                line = reader.readLine()
            }
            // If found set the header
            if (line != null) {
                exchange.in.setHeader('tokenfound', 'yes')
            }
        }
        finally {
            reader.close()
        }
    }

In this example the message is scanned for a token and a header is set if the token is found.

Receiving HL7 messages from the MINA endpoint with the LBS

MLLP is mostly used with HL7 messages. The standard HL7 support from Camel provides a data format for HL7 that can be used to marshall messages. This dataformat is also compatible with the MllpStoreCodec. Therefore, it can be used in routes:

from('mina:tcp://localhost:6127?sync=true&codec=mllpStoreCodec')
    .unmarshal().hl7()
    ...

Once the message is unmarshalled it can be processed with the standard HAPI based support that the Camel HL7 component offers, e.g.:

from('mina:tcp://localhost:6127?sync=true&codec=mllpStoreCodec')
    .unmarshal().hl7()
    .process { Exchange exchange ->
        def hl7Message = exchange.getIn().getBody(ca.uhn.hl7v2.model.Message.class)
        ...
    }

Sending HL7 messages to the MINA endpoint with the LBS

Just like receiving messages is done by unmarshalling via the HL7 data format, sending messages is done by marshalling:

.marshal().hl7()
.to('mina:tcp://localhost:6127?sync=true&codec=mllpStoreCodec')

To send a message to this endpoint, the body has to contain an HL7 message (ca.uhn.hl7v2.model.Message).

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.