Flow management

Flow management

Sample application

For a flow management sample application refer to the IPF reference application.

Concept

The flow manager is a service that monitors and controls application-specific message flows. It stores each incoming message as a flow object in a database and updates the flow object as the message travels through the integration application. If the message was processed and has been successfully delivered to a destination system, an acknowledgement (ACK) is stored with that particular message flow. If the processing of a message has failed, a negative acknowledgement (NAK) is stored with that message flow.

The following figure shows a sample integration solution managed by a flow manager. There's a single entry point for a message and two possible exit points. An interceptor at the entry point initializes a flow object in the flow manager database. The flow object contains the original message. Interceptors at the exit points acknowledge (ACK) the delivery of the message to a destination. You may also want to configure interceptors at destinations that handle error or fault messages. These interceptors negativly acknowledge (NAK) the message flow (interceptor is shown as a red box).

With the flow manager you can also replay messages. A replay re-submits the initially stored message. In order to avoid duplicate delivery of messages to destination systems you can install duplicate filters. These recognize replayed messages that have already been delivered to an external destination and filter them out. Duplicate filtering can be turned on and off at runtime via the JMX interface. An additional feature that will be supported in the future is registration for compensation events. If a processor wants to be notified if an error occurs it can register at the flow manager for compensation messages that allow the processor to undo certain actions. This feature is comparable to a <compensate> action in BPEL.

Message replays can be useful if a destination system is unavailable for a longer time than covered by a redelivery policy of internal message buffers. In this case the buffer gives up redelivery and leaves the administrator with the duty to run a manual redelivery (replay) once the destination system is available again. Note that a message that has already been acknowledged will by default not be re-delivered to the destination system. This allows for non-idempotent message receivers. Only NAKed messages or unacknowledged message flows can be sent to a destination system during a replay, otherwise they are filtered out. However, the duplicate filtering functionality can be turned on and off for each application individually.

The flow manager can also store string representations of messages that enter and leave an integration solution. These string representations are rendered at the flow interceptors. Application developers can provide custom renderers if needed, i.e. message rendering can be customized when a message flow is initialized, acknowledged or negatively acknowledged.

Stored string representations of inbound- and outbound messages (also called incoming- and outgoing messages) allow administrators and auditors to keep track which messages entered and left the system. These string representations can be visualized either via a generic JMX client or via the platform manager. Also, you can perform fulltext searches based on the string representations of messages. Messages can also be encrypted in the database and still be searched via fulltext message searches.

JMX interface

Rich flow management client

The IPF flow manager exposes its functionality over a JMX interface. You can use any JMX client to work with the flow manager. This is shown here using the JConsole from the Java SDK 6. IPF also ships with a special rich flow management client that is based on Eclipse RCP technology. You can find details about this client in the flow management client section of the reference manual. The flow management client is part of the integration platform manager which also provides a generic JMX client.

The following figure shows the flow manager JMX interface on the JConsole that is delivered with the Java SDK 6. In order to use the flow manager with the JConsole you must put the commons-flow-<version>.jar file on its classpath. This is explained in section JConsole extension. You can find the FlowManager MBean under the org.openehealth.ipf/service folder on the MBeans tab.

Via the FlowManager JMX interface you can:

  • Search for message flows with a given identifier, status, time-span or content (fulltext search).
  • Replay message flows with a given identifier, status or time-span.
  • Display flow properties and message contents (inbound, outbound)

Time-spans can be given in units of milliseconds (no symbol), seconds (s), minutes (m), hours (h) and days (d). Examples:

  • 2000 = 2000 milliseconds
  • 4s = four seconds
  • 3m = three minutes
  • 1h = one hour
  • 1d = one day

For example, when clicking the findLastFlows button with an argument of 3m the flow manager displays all message flows of the last 3 minutes. By default the upper value of the time-span is the current point in time. It can also be be set to any fixed point in time (see below). Here's a snippet from the result list:

The latest flow is at the top of the result list. In our example, this flow has the flow identifier 9. For each message flow the following properties are displayed:

Flow property Description
overall status CLEAN or ERROR. If a flow has been negativly acknowledged at least once it has the overall state ERROR, otherwise CLEAN.
application Name of the integration application that processes the message. Flow management operations are scoped by application name.
create time Time when the message flow was created.
replay time Time when the message flow was last replayed.
replay count Number of times a message flow was replayed.
replayable Whether or not the message flow can be replayed.
ACK count (expected) The expected number of acknowledgements in case of successful message processing.
ACK count (actual) The actual number of acknowledgements.
NAK count The actual number of negative acknowledgements.
text String representation of the message that caused the flow creation. Only displayed if operation findFlow with parameter includeText is set to true.

Flow parts give information about a message's copy or split history, i.e. if the message has been copied during a multicast operation or has been split using a splitter. If a message only has a single flow part then there was no split or multicast operation. Flow parts are created or updated during an ACK or NAK.

For each flow part the following properties are displayed:

Flow part property Description
status CLEAN (after an ACK) or ERROR (after a NAK).
flow duration Duration in milliseconds the message (represented by this flow part) needed from entering the message processing route until
  • successful delivery to its final destination or
  • processing by an error handler in case of a failure
contribution time The time stamp when this flow part has been ACKed or NAKed.
filter time The time stamp when this flow part was filtered during a replay.
contribution count Number of times this flow part has been ACKed or NAKed. Can be greater than one in case of redeliveries.
filter count Number of times a message has been filtered. Can be greater than zero in case of redeliveries.
text String representation of the ACKed or NAKed message represented by this flow part.

For each flow part the flow path is shown as well. If a message has not been split or multicast to more than one recipient the path will always be 0. If a message is, for example, split into two parts their paths will be 0.0 and 0.1. If the second message is again split into two parts then you finally end up with three messages with paths 0.0, 0.1.0 and 0.1.1. The next figure shows the JMX attributes of the flow manager.

Attribute Description
UpperTimeLimit Should be set if you don't want time-span values to be relative to the current time but instead to a fixed point in time (time-spans are entered by the user for flow searches and flow replay operations). For example, if you want to find all flows that have been NAKed between 1:51:25pm and 2:51:25pm on 28th Nov. 2008 then you enter the upper value of the time-span into to UpperTimeLimit field and use 1h (1 hour) as time-span argument for the findLastErrorFlows operation. With the setUpperTimeLimitToCurrentTime operation you can create an entry in the UpperTimeLimit field that represents the point in time when you invoked the operation i.e. pressed the button. If you always want to have the current time for the upper time limit then leave the UpperTimeLimit field blank.
Application The name of the application an administrator currently wants to monitor. JMX operations for different Application values will give different results.
EnableFiltering Controls whether duplicate filters are enabled or disabled. Setting EnableFiltering to false can be helpful for testing purposes.
EnableCleanup Controls whether a flow shall be cleaned up if the number of expected acknowledgements are reached. Cleanup means that the incoming message is removed from the flow object in the database. This saves disk space and still allows clients to retrieve the flow history. However, a replay of cleaned-up flows is not possible any more because this requires the initial message. EnableCleanup is set to false by default.
MaxFlows Limits the size of result sets returned by flow finder operations. By default the maximum number of flows returned is 100. If a finder operation finds more than 100 flows in the database then only the 100 most recent flows will be shown. To return all flows matching certain criteria leave the maxFlows field blank.

EnableCleanup and EnableFiltering settings are persisted by the flow manager in context of a certain Application value.

Message content

The following JMX operation are related to message content of fulltext searches:

Operation Description
findFlowMessageText(identifier) Returns the rendered message content of the inbound message of flow identified by identifier.
findFlowPartMessageText(identifier, flowPath) Returns the rendered message content of the outbound message of flow part identified by identifier and path flowPath.
findFlow(identifier, includeText) Returns the flow object identified by identifier. If includeText is set to true the text attributes of the flow object and its parts contain the rendered messages (if stored in the database). If there's no rendered message in the database or the flow object is returned from another operation then N/A will be displayed for the text attribute.
findLastFlowsWithMessageText(timespan, searchExpression) Returns a list of flows within timespan that match the fulltext searchExpression. Leaving searchExpression empty means 'any content'.
findLastErrorFlowsWithMessageText(timespan, searchExpression) Returns a list of flows within timespan that have been negatively acknowledged and that match the fulltext searchExpression. Leaving searchExpression empty means 'any content'.
findLastUnackFlowsWithMessageText(timespan, searchExpression) Returns a list of flows within timespan that have not been acknowledged and that match the fulltext searchExpression. Leaving searchExpression empty means 'any content'.

Here's an example from the reference application. We want to see the inbound- and outbound message content of flow with identifier 20037. The inbound message content can be obtained via findFlowMessageText(20037). The outbound message content (i.e. the transformation result) can be obtained via findFlowMessageText(20037, 0). Since there is only one flow path through the reference application the flowPath argument is 0.

The invocation result of findFlowMessageText(20037) is:

The invocation result of findFlowPartMessageText(20037, 0) is:

Search and view message content with platform manager

For searching and displaying message content via a rich user interface refer to the platform manager documentation.

JConsole extension

JConsole requires the commons-flow-<version>.jar on its classpath because the flow manager uses data transfer objects (DTOs) to communicate flow data to JMX clients. The version placeholder must be replaced with the IPF version you want to use. Let's say we use version IPF 1.5.0. Copy commons-flow-1.5.0.jar to the bin folder of your Java SDK 6 installation. On Windows, also create a flowmgr-console.bat file with the following content in that folder.

flowmgr-console.bat
@echo off

rem --------------------------------------------------------
rem  Start JConsole with flow manager jar containing DTOs
rem --------------------------------------------------------
jconsole -J-Djava.class.path=../lib/jconsole.jar;../lib/tools.jar;commons-flow-1.5.0.jar

Configuration

The flow manager is made up of two components.

  • The Camel-independent commons-flow component that implements the flow management services.
  • The Camel-dependent platform-camel-flow component that integrates flow management services into Camel routes. This component also implements the flow management DSL.

By default, the flow manager uses Hibernate to store flow data and messages in a Derby database. Oracle has been tested as well. Usage of other databases should be possible by replacing the JDBC driver but this has not been tested yet. For storing fulltext indices and running fulltext searches the flow manager uses Lucene and Hibernate Search. Encryption of rendered messages is done with the Jasypt crypto library. Here's an example how to configure commons-flow and lplatform-camel-flow within a Spring application context XML file (it was taken from the IPF reference application).

Configuration files created by archetypes

If you use the archetypes ipf-archetype-default and ipf-archetype-cluster for creating an IPF application then all required flow management configuration files will be created for you.

context-flowmgr.xml
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:ctx="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

  <!-- ================================================================= -->
  <!--  Declarative Transaction Management                               -->
  <!-- ================================================================= -->

  <tx:annotation-driven transaction-manager="hibernateTransactionManager"/>

  <!-- ================================================================== -->
  <!--  Flow Manager                                                      -->
  <!-- ================================================================== -->

  <bean id="flowManager"
    class="org.openehealth.ipf.platform.camel.flow.PlatformFlowManager">
  </bean>

  <!-- ================================================================== -->
  <!--  Flow Processors (bean definition optional since IPF 1.6.0)        -->
  <!-- ================================================================== -->

  <bean scope="prototype" class="org.openehealth.ipf.platform.camel.flow.process.FlowBeginProcessor">
    <property name="messageRenderer" ref="messageRenderer" />
  </bean>

  <bean scope="prototype" class="org.openehealth.ipf.platform.camel.flow.process.FlowEndProcessor">
    <property name="messageRenderer" ref="messageRenderer" />
  </bean>

  <bean scope="prototype" class="org.openehealth.ipf.platform.camel.flow.process.FlowErrorProcessor">
    <property name="messageRenderer" ref="messageRenderer" />
  </bean>

  <!-- ================================================================== -->
  <!--  Dedupe (bean definition optional since IPF 1.6.0)                 -->
  <!-- ================================================================== -->

  <bean scope="prototype" class="org.openehealth.ipf.platform.camel.flow.dedupe.Dedupe" />

  <!-- ================================================================= -->
  <!--  Repositories                                                     -->
  <!-- ================================================================= -->

  <bean id="sequenceRepository"
    class="org.openehealth.ipf.commons.flow.repository.SequenceRepositoryImpl">
    <property name="hibernateTemplate" ref="hibernateTemplate" />
  </bean>

  <bean id="flowRepository"
    class="org.openehealth.ipf.commons.flow.repository.FlowRepositoryImpl">
    <property name="hibernateTemplate" ref="hibernateTemplate" />
  </bean>

  <bean id="configRepository"
    class="org.openehealth.ipf.commons.flow.repository.ConfigRepositoryImpl">
    <property name="hibernateTemplate" ref="hibernateTemplate" />
  </bean>

  <!-- ================================================================= -->
  <!--  Hibernate Setup                                                  -->
  <!-- ================================================================= -->

  <bean id="hibernateSessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean">
    <property name="dataSource" ref="flowDataSource"/>
    <property name="configLocation" value="/hibernate-flow.xml"/>
    <property name="configurationClass" value="org.hibernate.cfg.AnnotationConfiguration"/>
    <property name="hibernateProperties">
      <props>
        <prop key="hibernate.dialect">org.openehealth.ipf.commons.flow.derby.DerbyDialect</prop>
        <prop key="hibernate.hbm2ddl.auto">update</prop>
        <prop key="hibernate.show_sql">false</prop>
        <prop key="hibernate.format_sql">false</prop>
        <prop key="hibernate.search.autoregister_listeners">false</prop>
        <!--
            THESE SETTINGS ARE PRELIMINARY. CLUSTER-SUPPORT FOR LUCENE INDEX TO FOLLOW.
         -->
        <prop key="hibernate.search.default.directory_provider">org.hibernate.search.store.FSDirectoryProvider</prop>
        <prop key="hibernate.search.default.indexBase">./lucene</prop>
      </props>
    </property>
    <!--
         Enables rendered message encryption and full text indexing:
         * To use rendering without both encryption and full text indexing,
           delete the 'eventListeners' property.
         * To use rendering with encryption only, delete the
           post-insert, post-update and post-delete entries.
         * To use rendering with full text indexing only, delete the bean
           references to textDecryptorEventListener in the post-insert and
           post-update event listeners. Delete the pre-update, pre-insert
           and post-load entries as well.
     -->
    <property name="eventListeners">
      <map>
        <entry key="pre-update" value-ref="textEncryptorEventListener"/>
        <entry key="post-update">
          <list>
            <ref bean="textDecryptorEventListener"/>
            <ref bean="textIndexEventListener"/>
          </list>
        </entry>
        <entry key="pre-insert" value-ref="textEncryptorEventListener"/>
        <entry key="post-insert">
          <list>
            <ref bean="textDecryptorEventListener"/>
            <ref bean="textIndexEventListener"/>
          </list>
        </entry>
        <entry key="post-delete" value-ref="textIndexEventListener"/>
        <entry key="post-load" value-ref="textDecryptorEventListener"/>
      </map>
    </property>
  </bean>

  <bean id="hibernateTemplate"
    class="org.springframework.orm.hibernate3.HibernateTemplate">
    <property name="sessionFactory" ref="hibernateSessionFactory" />
  </bean>

  <bean id="hibernateTransactionManager"
    class="org.springframework.orm.hibernate3.HibernateTransactionManager">
    <property name="sessionFactory" ref="hibernateSessionFactory"/>
  </bean>

  <!-- ================================================================= -->
  <!--  Datasource Setup                                                 -->
  <!-- ================================================================= -->

  <bean id="flowDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="org.apache.derby.jdbc.ClientDriver" />
    <property name="url" value="jdbc:derby://localhost:1527/flowmgr;create=true" />
    <property name="defaultAutoCommit" value="false" />
    <property name="initialSize" value="10" />
    <property name="maxActive" value="50" />
  </bean>

  <!--<bean id="flowDataSource" class="org.apache.derby.jdbc.EmbeddedConnectionPoolDataSource">
    <property name="databaseName" value="flowmgr"/>
    <property name="createDatabase" value="create"/>
  </bean>-->

  <!-- ================================================================== -->
  <!--  JMX Setup                                                         -->
  <!-- ================================================================== -->

  <bean class="org.springframework.jmx.export.MBeanExporter" lazy-init="false">
    <property name="autodetect" value="false"/>
    <property name="assembler" ref="assembler"/>
    <property name="namingStrategy" ref="namingStrategy"/>
    <property name="beans">
      <map>
        <entry
            key="org.openehealth.ipf.platform:type=service,name=FlowManager"
            value-ref="flowManagerMBean" />
      </map>
    </property>
  </bean>

  <bean id="flowManagerMBean" class="org.openehealth.ipf.commons.flow.jmx.FlowManagerMBean">
    <property name="application" value="tutorial" />
  </bean>

  <!-- ================================================================= -->
  <!--  JMX Annotation Support                                           -->
  <!-- ================================================================= -->

  <bean id="jmxAttributeSource"
      class="org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource"/>

  <bean id="assembler"
      class="org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler">
      <property name="attributeSource" ref="jmxAttributeSource"/>
  </bean>

  <bean id="namingStrategy"
      class="org.springframework.jmx.export.naming.MetadataNamingStrategy">
      <property name="attributeSource" ref="jmxAttributeSource"/>
  </bean>

  <!-- ================================================================= -->
  <!--  Encryption setup                                                 -->
  <!-- ================================================================= -->

  <bean id="stringEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
    <property name="password">
      <value>test_password</value>
    </property>
  </bean>

  <bean id="textEncryptorEventListener"
    class="org.openehealth.ipf.commons.flow.hibernate.RenderedMessageEncryptEventListener">
    <property name="stringEncryptor" ref="stringEncryptor" />
  </bean>

  <bean id="textDecryptorEventListener"
    class="org.openehealth.ipf.commons.flow.hibernate.RenderedMessageDecryptEventListener">
    <property name="stringEncryptor" ref="stringEncryptor"/>
  </bean>

  <!-- ================================================================= -->
  <!--  Fulltext indexing/search setup                                   -->
  <!-- ================================================================= -->

  <bean id="textIndexEventListener" class="org.hibernate.search.event.FullTextIndexEventListener"/>
  <bean id="flowSearchCallback" class="org.openehealth.ipf.commons.flow.repository.search.DefaultSearchCallback" />

  <!-- ================================================================== -->
  <!--  Message renderer                                                  -->
  <!-- ================================================================== -->

  <bean id="messageRenderer"
    class="org.openehealth.ipf.platform.camel.flow.render.SimpleMessageRenderer">
  </bean>

</beans>

The beans contained in this configuration files have been grouped into sections. These are further explained in the following table.

Section Description
Declarative Transaction Management For transaction management we use Spring's transaction management infrastructure and a Hibernate transaction manager (see config section Hibernate Setup). Transaction boundaries are defined with @Transactional annotations on the FlowManager interface.
Flow Manager The Camel-specific flow manager implementation provided by the platform-camel-flow component.
Flow Processors This section defines the interceptors that create, acknowledge and negatively acknowledge flows in the flow manager database. These beans have prototype scope because several instances of a certain interceptor type can be configured differently in a route builder. These interceptors are injected a FlowManager instance and a CamelContext either via auto-wiring (as in our example) or explicitly using setters. You can also inject message renderers (see below) via explicit setters as in our example. All instances created from a certain prototype share the injected message renderer. If you want to configure message renderers for individual instances you can do so via the flow management DSL extensions. As of IPF 1.6.0 definition of flow processor beans is optional. They're created by flow DSL model classes if flow processor bean definitions are omitted.
Dedupe A Predicate implementation used in combination with a Camel filter to filter out flow duplicates. As of IPF 1.6.0 definition of flow dedupe is optional.
Repositories This section defines the data access objects (DAOs) for flows, configrations and sequence numbers.
Hibernate Setup This section defines Hibernate-specific beans. The hibernateSessionFactory bean is configured to use a Derby-specific DataSource and a mapping configuration based on annotations on persistent domain objects. The referenced hibernate-flow.xml configuration file is contained in the commons-flow component and needs not be created by the application developer. The hibernateProperties contain settings for the Derby database and the Lucene index. The eventListeners property configures event listeners for fulltext indexing and encryption of rendered messages. The hibernateTemplate is required by the data access objects defined in the Repositories section. The hibernateTransactionManager bean is the local database transaction manager used by Spring's transaction management infrastructure.
Datasource Setup Defines the DataSource to use for the hibernateSessionFactory. Here we use a data source implementation that connects to a standalone Derby server. If you want to start an embedded Derby database (which makes sense for single-node deployments) use the second data source bean which is commented out in our example.
JMX Setup This section sets up the flow manager MBean and the MBean exporter. The MBean exporter is configured to use only the flowManagerMBean instead of scanning all beans in the application context.
JMX Annotation Support This section sets up the required infrastructure for processing JMX annotations on the FlowManagerMBean class.
Encryption setup Defines the Hibernate event listeners for encryption and decryption of stored messages. These listeners use a password-based encryptor. The password used for crypto operations is set via the password property of the stringEncryptor bean.
Fulltext indexing/search setup Defines the Hibernate event listener for fulltext indexing (textIndexEventListener bean) and a flowSearchCallback bean that is auto-injected into the flowRepository bean. The flowSearchCallback bean is required to perform fulltext searches and can be customized via its inboundTextAnalyzer and outboundTextAnalyzer properties. The type of these properties is org.apache.lucene.analysis.Analyzer and their default values org.apache.lucene.analysis.standard.StandardAnalyzer instances. You might want to change these property values if you want to have different analyzers for inbound and outbound message text analysis.
Message renderer Defines one or more message renderer beans. These are either directly injected into flow interceptors or referenced via the flow management DSL extensions.

Message renderers must implement the org.openehealth.ipf.platform.camel.flow.PlatformMessageRenderer interface. IPF provides a trivial message renderer implementation (org.openehealth.ipf.platform.camel.flow.render.SimpleMessageRenderer) that is also used in the reference application. It simply creates a string representation from the in-message body of a Camel exchange.

SimpleMessageRenderer.java
package org.openehealth.ipf.platform.camel.flow.render;

import org.openehealth.ipf.platform.camel.flow.PlatformMessage;
import org.openehealth.ipf.platform.camel.flow.PlatformMessageRenderer;

public class SimpleMessageRenderer implements PlatformMessageRenderer {

    @Override
    public String render(PlatformMessage message) {
        return message.getExchange().getIn().getBody(String.class);
    }

}

Clients must only use the following interfaces or classes to interact with the flow manager.

  • org.openehealth.ipf.commons.flow.FlowManager interface for Java clients.
  • org.openehealth.ipf.commons.flow.jmx.FlowManagerMBean for JMX clients.
  • A RESTful HTTP service interface is currently work in progress.

Derby database

Usage of Derby for persisting flow management information is described in the previous section.

Oracle Database

This section describes the configuration steps necessary to run the flow manager with an Oracle database. We tested the configuration with an installation of Oracle Database 10g Enterprise Edition Release 10.2.0.3.0. Please note that the ojdbc jar file can not be downloaded directly from a central Maven repository because its license does not permit to be redistributed by anyone but the vendor. Therefore the jar file should be installed manually into your local Maven repository. Since there are differences between the e.g. ojdbc14.jar for Oracle 10.2.0.3 and 10.2.0.4, it is probably a good way to reference the jars like this:

pom.xml
<properties>
    <oracle-rdbms-version>...</oracle-rdbms-version>
</properties>

<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ojdbc14</artifactId>
    <version>${oracle-rdbms-version}</version>
</dependency>

From the oracle download site you can pick the ojdbc driver you need.

http://www.oracle.com/technology/software/tech/java/sqlj_jdbc/index.html

If you don't know which oracle edition/release your system is running on, it could be easily found out by the following SQL statement:

SELECT * from v$version;

The output may look like this:

After downloading the file manually, for the sake of the consistency of your maven repository you should rename this file like we did in our example:

Win: ren ojdbc14.jar ojdbc14-10.2.0.3.jar
Linux: mv ojdbc14.jar ojdbc14-10.2.0.3.jar

and then install it into you maven cache using the command:

mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc14 -Dversion=10.2.0.3 -Dpackaging=jar -Dfile=<path-to-drivers-folder>/ojdbc14-10.2.0.3.jar

If the maven task was successful you will be able now to reference this driver from your pom.xml in a following way:

<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ojdbc14</artifactId>
    <version>10.2.0.3</version>
</dependency>

For Oracle 11g releases you should use the ojdbc5.jar or ojdbc6.jar. Additionally you will need the dependencies to

pom.xml
<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>${commons-dbcp-version}</version>
</dependency>

Prerequisites:

To provide the correlating DB schema/user the following statements should be executed on your Oracle instance. The user with the sufficient privileges should run these two statements:

CREATE USER platform IDENTIFIED BY <your_password> DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp;
GRANT connect, resource TO platform IDENTIFIED BY <your_password>;

Here are the Oracle-specific bean definitions in the Spring application context

context-flowmgr.xml
  ...

  <ctx:property-placeholder location="/path/to/context-flowmgr.properties" />

  <!--================================== -->
  <!--  Hibernate Setup                  -->
  <!--================================== -->

  <bean id="hibernateSessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean">
    <property name="dataSource" ref="flowDataSource"/>
    <property name="configLocation" value="/hibernate-flow.xml"/>
    <property name="configurationClass" value="org.hibernate.cfg.AnnotationConfiguration"/>
    <property name="hibernateProperties">
      <props>
        <!-- use the appropriate dialect -->
        <prop key="hibernate.dialect">org.hibernate.dialect.Oracle10gDialect</prop>
        <prop key="hibernate.hbm2ddl.auto">update</prop>
        <prop key="hibernate.show_sql">false</prop>
        <prop key="hibernate.format_sql">false</prop>
        <prop key="hibernate.search.autoregister_listeners">false</prop>
      </props>
    </property>
  </bean>

  <!--================================== -->
  <!--  Data Source                      -->
  <!--================================== -->

  <bean id="flowDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
    <property name="url" value="jdbc:oracle:thin:@${flowmgr.database.host}:${flowmgr.database.port}:${flowmgr.database.name}"/>
    <property name="username" value="${flowmgr.database.username}" />
    <property name="password" value="${flowmgr.database.password}" />
    <property name="defaultAutoCommit" value="false" />
    <property name="initialSize" value="10" />
    <property name="maxActive" value="50" />
  </bean>

  ...

And finally the snippet from the corresponding properties file

context-flowmgr.properties
flowmgr.database.name=<oracle_DB_SID>
flowmgr.database.host=<oracle_DB_hostname>
flowmgr.database.port=1521
flowmgr.database.username=platform
flowmgr.database.password=<your_password>

DSL extensions

The Flow Management DSL extensions are defined in the org.openehealth.ipf.platform.camel.flow.extend.FlowModelExtension.groovy class. Their purpose is to make the flow management functionality available in Camel routes. The DSL extensions are summarized in the next table and explained in detail in the following subsections.

DSL extension Description Model class
initFlow Starts recording a message flow. org.openehealth.ipf.platform.camel.flow.model.FlowBeginProcessorType
ackFlow Logs the successful delivery of a message (i.e. flow part). org.openehealth.ipf.platform.camel.flow.model.FlowEndProcessorType
nakFlow Logs the erroneous processing of a message (i.e. flow part). org.openehealth.ipf.platform.camel.flow.model.FlowErrorProcessorType
dedupeFlow Filters messages during flow replay operations to avoid delivery of duplicates. org.openehealth.ipf.platform.camel.flow.model.DedupeType

The Model Class column contains the DSL model classes that are created by these DSL extensions. These model classes allow further parameterization of the DSL extensions.

The initFlow DSL extension

With the initFlow DSL extension you define the location in your route where flow management should start. For example

from('jetty:http://localhost:8080/example')
  .initFlow(myUniqueId)
  .to('file:...')

creates a flow interceptor directly after the jetty endpoint. When a message arrives on this endpoint it will be stored in the flow management database. Argument to initFlow must be a unique identifier. This identifier is needed during message replay because the flow manager must know from which flow interceptor to start the replay.

Parameterization of initFlow

Let's start with an example.

from('direct:example')
    .initFlow('test')
        .application('test')
        .expectedAckCount(1)
    .process { exchange -> ...}
    ...

Here we assign the flow interceptor the unique id test (probably not the best one for production use ). Each flow is stored in context of a certain application name which is test in our case. When you search for flows in the JMX console or via the platform manager you refer to this name. With expectedAckCount you define how many acknowledgement you expect for a certain flow. Definition of this expectation is optional but allows you to do some optimizations like cleanup of the flow message content after successful delivery. Flow cleanup is primarily used to save disk space. The full list of initFlow parameters is given in the next table.

Parameter Description
application(String) Assigns created flows an application name. When you search for flows in the JMX console or via the platform manager you refer to this name.
expectedAckCount(int) Defines how many acknowledgements you expect for a certain flow. Definition of this expectation is optional but allows you to do some optimizations like cleanup of the flow message content after successful delivery.
replayErrorHandler(String) Defines the endpoint URI for error messages in case a flow replay fails.
inType(Class) For an incoming message, the flow manager tries to convert the in-message message body into a byte array. This doesn't work for all body types, so you first have to convert it into a type from which the flow manager can create a byte array. The byte array is stored as BLOB in the flow management database.
outType(Class) For an outgoing message, the flow manager by default tries to convert the previously created byte array from the input message to an InputStream object. This will be sent to subsequent processors in the route. If you want to have a different type instead then you can define it here. Please note that this setting is ignored if outConversion is set to false.
inFormat(org.apache.camel.spi.DataFormat) Serves the same purpose as inType except that you marshal the in-message body to a stream using a DataFormat. If you specify both inType and inFormat then inType will be ignored.
outFormat(org.apache.camel.spi.DataFormat) Serves the same purpose as outType except that you unmarshal the in-message body from a stream using a DataFormat. If you specify both outType and outFormat then outType will be ignored.
outConversion(boolean) If set to false then no operation using outType or outFormat will be performed. The in-message body will contain the original body. If inType is defined it will contain a body of that type. This setting is ignored during replay of messages because in this case the flow manager must reconstruct the message from a byte array.
renderer(String) Sets the message renderer for this interceptor. Argument is the name of the message renderer bean defined in the Spring application context.
renderer(org.openehealth.ipf.platform.camel.flow.PlatformMessageRenderer) Sets the message renderer for this interceptor. Argument is a messager renderer instance.

The ackFlow DSL extension

With the ackFlow DSL extension you define the location in the route where the successful delivery of a message should be confirmed. This is typically done after sending the message to a target system via an outbound endpoint. The confirmation is then written to the flow management database. Here's an example

...
.to('http://www.example.org/webapp/path')
.ackFlow()

After the message is successfully delivered to http://www.example.org/webapp/path the corresponding flow object is updated in the database. If an error occurs during delivery ackFlow won't process the message.

Parameterization of ackFlow

Parameter Description
renderer(String) Sets the message renderer for this interceptor. Argument is the name of the message renderer bean defined in the Spring application context.
renderer(org.openehealth.ipf.platform.camel.flow.PlatformMessageRenderer) Sets the message renderer for this interceptor. Argument is a messager renderer instance.

The nakFlow DSL extension

The nakFlow DSL extension is usually used within routes for error handling. Here's an example

onException(MyException.class).to('direct:error')

from('direct:example')
    .initFlow(...)
    .process { ... }  // throws MyException

from('direct:error')
    ...
    .nakFlow()

First we define an error handler specific for the MyException class. When the exception is thrown in the first route the error handler will route the message to the direct:error endpoint. Here, we install the flow error interceptor with the nakFlow DSL extension. This interceptor logs an error for the flow part that corresponds to the message that caused the error.

Parameterization of nakFlow

Parameter Description
renderer(String) Sets the message renderer for this interceptor. Argument is the name of the message renderer bean defined in the Spring application context.
renderer(org.openehealth.ipf.platform.camel.flow.PlatformMessageRenderer) Sets the message renderer for this interceptor. Argument is a messager renderer instance.

The dedupe DSL extension

The dedupe DSL extension is used to filter duplicates during replay of flows. In the following example the dedupe checks whether a certain message has already been sent via the outbound http endpoint.

from('direct:example')
    .initFlow(...)
    ...
    .dedupe()
    .to('http://...')

You can enable and disable dedupes at runtime via the flow management JMX interface.

Splits and multicasts

IPF version <= 1.6.0

Documentation in this subsection only applies to IPF version 1.6.0 or earlier. For later versions, see next section.

When you use recipient lists (multicast) and splitters in your routes then it is necessary that the copies of a certain incoming message contain different flow path information. This information must be updated by mutlicast and split processors. Since Camel's multicast and split processors don't know about the flow manager we applied additional logic to these processor using AOP with AspectJ. During the build of the platform-camel-core component we weave flow-management-specific aspects into camel-core classes. The weaved classes togehter with all other camel-core classes are then packaged with platform-camel-core. It is therefore important to remove the original camel-core jar file from the classpath when you include platform-camel-core, otherwise, flow management won't work correctly when using recipient lists or splitters in your routes.

To exclude camel-core from Maven's dependency calculation define an exclusion list for each dependency that directly or transitively depends on camel-core. In the following example the version numbers of the dependencies are omitted because they've been declared separately in a <dependencyManagement> section (not shown here).

pom.xml - dependencies with camel-core exclusions
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.openehealth.ipf.platform-camel</groupId>
            <artifactId>platform-camel-flow</artifactId>
            <version>${pom.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.camel</groupId>
                    <artifactId>camel-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jetty</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.camel</groupId>
                    <artifactId>camel-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.camel</groupId>
                    <artifactId>camel-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.camel</groupId>
                    <artifactId>camel-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        ...
    </dependencies>
    ...

IPF version > 1.6.0

Documentation in this subsection only applies to IPF versions later than 1.6.0.

Beginning with IPF 1.7.x development, weaving of flow management aspects was changed from compile-time weaving to load-time weaving. If you use multicasts or splitters in your routes then you must do the following additional configuration:

  • Add a context:load-time-weaver element in your Spring application context.
  • Put a spring-agent-<version>.jar file into some folder of your project where <version> must be replaced by a released Springframework version.
  • Start the application JVM with the additional option -javaagent:<path>/spring-agent-<version>.jar where <path> must be replaced by the path where you put the jar file.
  • The camel-core-<version>.jar must be on the classpath in any case.

For example, when you put spring-agent-2.5.6 in the lib folder of your project then you'll need to start the JVM (on Windows) with

%JAVA_HOME%\bin\java.exe -javaagent:lib\spring-agent-2.5.6.jar ...

The Spring application context must also contain the context:load-time-weaver element.

context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">

  <context:load-time-weaver/>


  ...

Refer to the IPF reference application source code for a running example of AspectJ load-time weaving.

Heap space

Using load time weaving requires some additional heap space, so maybe you need to adjust the -Xmx JVM option to a higher value.

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.