Dans cet article j'ai décidé de regrouper un petit ensemble de trucs et astuces à connaître lors du développement d'applications Camel.
Et pour une fois, je vais rédiger ce contenu en anglais afin de le rendre accessible au plus grand nombre.
Edit : This article is viewable at (thanks to them) :
Polling an empty directory (and send an empty message with null body) :
from("file://temp?sendEmptyMessageWhenIdle=true") |
Stop a route :
.process(new Processor() { public void process(Exchange exchange) throws Exception { getContext().stopRoute("ROUTE_ID"); } }) |
Access a property of the object in the body : admitting the object has a method named "getMydata()" : new ValueBuilder(simple("${body.mydata}")).isEqualTo(...) |
Define an aggregator :
.aggregate(simple("${header.id}.substring(0,15)"), genericAggregationStrategy)
public class GenericAggregationStrategy implements AggregationStrategy { @SuppressWarnings("unchecked") public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { ArrayList<Object> list = new ArrayList<Object>(); list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } else { Object oldIn = oldExchange.getIn().getBody(); ArrayList<Object> list = null; if(oldIn instanceof ArrayList) { list = (ArrayList<Object>) oldIn; } else { list = new ArrayList<Object>(); list.add(oldIn); } list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } } } |
Manually trigger an aggregation's completion (whatever it is) : Send a message with the header Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true |
It is possible to do from("bean:...") , knowing that the bean will be polled permanently (like with "file") and re-instanciated each time. |
Modify the message's body on a route, using :
.transform(myExpression)with myExpression :
public class MyExpression implements Expression { public <T> T evaluate(Exchange exchange, Class<T> type) { MyBean newData = ...; return exchange.getContext().getTypeConverter() .convertTo(type, newData); } } |
Using JaxB :
// indicate to Jaxb to not write XML prolog : JaxbDataFormat jaxbDataFormat = new JaxbDataFormat("my.business_classes.package"); jaxb.setFragment(true); |
General concepts for threads management :
|
Define a shutdown strategy : getContext().setShutdownStrategy(new MyShutdownStrategy(getContext()));With : public class MyShutdownStrategy extends DefaultShutdownStrategy { protected CamelContext camelContext; private long timeout = 1; private TimeUnit timeUnit = TimeUnit.SECONDS; public SpiralShutdownStrategy(CamelContext camelContext) { this.camelContext = camelContext; } @Override public long getTimeout() { return this.timeout; } @Override public TimeUnit getTimeUnit() { return this.timeUnit; } @Override public CamelContext getCamelContext() { return this.camelContext; } /** * To ensure shutdown * */ @Override public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, false); } /** * To ensure shutdown * */ @Override public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { doShutdown(context, routes, this.timeout, this.timeUnit, false, false, false); } /** * To ensure shutdown * */ @Override public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { super.shutdown(context, route, this.timeout, this.timeUnit, false); return true; } } |
Stop a batch : .process(new Processor() { public void process(Exchange exchange) throws Exception { context.stop(); } }); |
Calling a method of a bean from a route :
|
Exceptions management on routes :
|
For writing file, only the header Exchange.FILE_NAME is necessary. |
Reorder messages with component .resequence :
|
Split the body with a token :
.split(body().tokenize("TOKEN"))Knowing that the TOKEN will be deleted from content. For example, if receiving a message containing : "data1TOKENdata2TOKENdata3", messages created will be : "data1", "data2, "data3". So avoid this when treating XML data, prefer "tokenizeXML()". |
Dynamic access to body's data :
|
Sending mails :
from("direct:mail") .setHeader("To", constant(mailTo)) .setHeader("From", constant(mailFrom)) .setHeader("Subject", constant(mailSubject)) .to("smtp://${user}@${server}:${port}?password=${password}");With attachment : .beanRef(MAIL_ATTACHER, "attachLog"); //with public class MailAttacher { public void attachLog(Exchange exc) throws Exception { File toAttach = ...; exc.getIn().addAttachment(toAttach.getName(), new DataHandler(new FileDataSource(toAttach))); // if needed exc.setProperty(Exchange.CHARSET_NAME, "UTF-8"); } } |
Useful Exchange's properties :
|
Loop a route :
from("direct:...") .loop(countExpression) .to("direct:insideLoop") .end() Where "countExpression" is an Expression used to dynamically compute the loop count (evaluated entering the loop) It is preferable to move the loop's code in a new route if the process is complex. |
Headers management :
Message's headers are defined at its creation. When using a ".split()", all subsequent messages will have the same headers from the original message (so be careful when managing files). In an aggregation, custom headers will have to be managed manually to be preserved in the rest of the route. |
Intercept a message and execute a route parallely (to be declared before routes) :
interceptSendToEndpoint("ENDPOINT_TO_INTERSEPT").to(...)... |
Send an exchange from a Java class to a route :
|
Definie a maximum inflight messages with a route policy :
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(999); policy.setScope(ThrottlingInflightRoutePolicy.ThrottlingScope.Route); from("seda:route").routePolicy(policy) |
Best pratices to combine Split and aggregator : It is possible to use : .split(body()) (...) .aggregate(simple("${header.id}.substring(0,15)"), simpleAggregationStrategy) .completionPredicate(header(Exchange.SPLIT_COMPLETE).isEqualTo(Boolean.TRUE)) (... end of route ...)But with an empty list (null body or list size to 0), split behavior must be confusing and might shortcut the end of the route. Safer use is : .split(body(), simpleAggregationStrategy) (...) .end()The end() instruction clearly isolates the split instructions block and simpleAggregationStrategy is still triggered at SPLIT_COMPLETE
|
To be continued, don't hesitate to participate!
P.S. : un grand merci à l'équipe de "Camel masters", sans qui rien de tout cela n'aurait été possible! Daminou, Gaël et Gros ;-) kiss, love.