April 07, 2017 ( last updated : April 06, 2017 )
jboss-fuse
data-grid
hot-rod
protobuf
lucene
https://github.com/alainpham/techlab-fuse-jdg-simple-hotrod
Building Integration and Services platform with JBoss Fuse is great. It is even better when you add a distributed in memory data base solution such as JBoss Data Grid to the mix. This article will show how to make both technologies work together using the camel-jbossdatagrid component. We will go through the setup of a JBoss Data Grid server with persistence and see how to use it in a JBoss Fuse application through the remote Hot Rod client. Furthermore we will see how to take advantage of Protocol buffers and Lucene to index data and perform content based queries.
In the example, we will buil rest services to store and retrieve some business events.
In summary the steps in this tutorial are the following :
JBoss Data Grid can be accessed through 2 different modes : Library Mode and Server Mode.
In Server Mode, Data Grid is installed as a distant server and Fuse gets access to the objects through the Hot Rod client.
In library mode (or embedded), the Fuse engine uses it's own JVM memory to contain objects that is part of the Data Grid. In other words the Fuse engine acts as if it was a node amongst the Data Grid cluster. This mode gives access to advanced features such as transactions and locking and is actually quite easy to setup. The down side is that it is not possible to perform queries with the infinispan-embedded-query library with Fuse in a Karaf container. The main reason is that infinispan-embedded-query includes full dependencies to libraries such as hibernate. Hence it is not suited for an OSGI container such as Karaf.
In this article we will explore the possibilities of the Server Mode with the Hot Rod client. The Library Mode will be discussed in an other article.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jbossdatagrid</artifactId>
<version>6.5.1.Final-redhat-1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty4-http</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-swagger-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
</dependency>
<server xmlns="urn:jboss:domain:1.6">
...
<subsystem xmlns="urn:jboss:domain:datasources:1.2">
<datasources>
<datasource jndi-name="java:jboss/datasources/JdbcDS" pool-name="JdbcDS" enabled="true" use-java-context="true">
<connection-url>jdbc:h2:tcp://localhost:8942/dgdb</connection-url>
<driver>h2</driver>
<security>
<user-name>sa</user-name>
<password />
</security>
</datasource>
<drivers>
<driver name="h2" module="com.h2database.h2">
<xa-datasource-class>org.h2.jdbcx.JdbcDataSource</xa-datasource-class>
</driver>
</drivers>
</datasources>
</subsystem>
<subsystem xmlns="urn:infinispan:server:core:6.3" default-cache-container="local">
<cache-container name="local" default-cache="default" statistics="true">
...
<local-cache name="event">
<eviction strategy="LRU" max-entries="50" /> <!-- At most we have 50 entries in the cache by evicting [l]east-[r]ecently-[u]sed-->
<indexing index="LOCAL"> <!-- Indexing is activated to be stored localy only -->
<property name="default.directory_provider">filesystem</property>
<property name="default.indexBase">ispn_index</property>
</indexing>
<!-- A mixed store is created to be able to contain entries with string keys and also binary keys -->
<mixed-keyed-jdbc-store datasource="java:jboss/datasources/JdbcDS" passivation="false" preload="true" purge="false">
<binary-keyed-table prefix="ISPN_MIX_BKT" create-on-start="true" drop-on-exit="false">
<id-column name="id" type="VARCHAR" />
<data-column name="datum" type="BINARY" />
<timestamp-column name="version" type="BIGINT" />
</binary-keyed-table>
<string-keyed-table prefix="ISPN_MIX_STR" create-on-start="true" drop-on-exit="false">
<id-column name="id" type="VARCHAR" />
<data-column name="datum" type="BINARY" />
<timestamp-column name="version" type="BIGINT" />
</string-keyed-table>
</mixed-keyed-jdbc-store>
</local-cache>
</cache-container>
<cache-container name="security" />
</subsystem>
...
</server>
Download H2 data base here :
https://storage.googleapis.com/google-code-archive-downloads/v2/code.google.com/h2database/h2-2012-07-13.zip
Run the database with the following parameters
java -cp h2*.jar org.h2.tools.Server -tcp -tcpAllowOthers -tcpPort 8942 -baseDir ./h2dbstore -web -webAllowOthers -webPort 11112
Now we can start the JBoss Data Grid Server by going to its installation folder and running :
bin/standalone.sh
Create a class and annotate it so that it is indexed by Data Grid. To enable indexing through Hot Rod, POJOs need to serialized as protocol buffers
package techlab.model;
import java.io.Serializable;
import java.util.Date;
import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;
@ProtoDoc("@Indexed")
public class Event implements Serializable{
private static final long serialVersionUID = 1L;
private String uid;
private Date timestmp;
private String name;
private String content;
@ProtoField(number = 1)
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
@ProtoDoc("@IndexedField(index = true, store = false)")
@ProtoField(number = 2)
public Date getTimestmp() {
return timestmp;
}
public void setTimestmp(Date timestmp) {
this.timestmp = timestmp;
}
@ProtoDoc("@IndexedField(index = true, store = false)")
@ProtoField(number = 3)
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@ProtoField(number = 4)
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
Write a CacheManager factory. Note that protobuf schemas can be generated from the annotated class and then registered to the Data Grid Server on the reserved in metadata cache
package techlab.factory;
import java.io.IOException;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.protostream.annotations.ProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoSchemaBuilderException;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
import techlab.model.Event;
public class RemoteCacheManagerFactory {
ConfigurationBuilder clientBuilder;
public RemoteCacheManagerFactory(String hostname, int port) {
clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
.host(hostname)
.port(port)
.marshaller(new ProtoStreamMarshaller());
}
public RemoteCacheManager newRemoteCacheManager() throws ProtoSchemaBuilderException, IOException {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());
SerializationContext ctx = ProtoStreamMarshaller.getSerializationContext(remoteCacheManager);
ProtoSchemaBuilder protoSchemaBuilder = new ProtoSchemaBuilder();
//create a protobuf schema file from the annotated class. Protobuf marshallers and unmarshallers are generated automtically
String eventSchema = protoSchemaBuilder
.fileName("event.proto")
.packageName("techlab")
.addClass(Event.class)
.build(ctx);
//register the protobuf schema in the remote cache
RemoteCache<String, String> metadataCache = remoteCacheManager.getCache(ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
metadataCache.put("event.proto", eventSchema);
//check if there is an error with the schemas
String errors = metadataCache.get(ProtobufMetadataManagerConstants.ERRORS_KEY_SUFFIX);
if (errors != null) {
throw new IllegalStateException("Some Protobuf schema files contain errors:\n" + errors);
}
return remoteCacheManager;
}
}
Configure the factory in the spring context
<beans>
...
<!-- ########################################################### -->
<!-- Definition of remote cache Manager -->
<!-- ########################################################### -->
<bean class="techlab.factory.RemoteCacheManagerFactory" id="remoteCacheManagerFactory">
<constructor-arg value="localhost" />
<constructor-arg value="11222" />
</bean>
<bean factory-bean="remoteCacheManagerFactory" factory-method="newRemoteCacheManager"
id="cacheManager" />
...
</bean>
Add a reusable endpoint to the Camel context
<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
<!-- Data Grid endpoint -->
<endpoint id="datagrid" uri="infinispan://?cacheContainer=#cacheManager" />
</camelContext>
Use the rest DSL to create routes and expose services to do the basic operations
<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
<!-- Data Grid endpoint -->
<endpoint id="datagrid" uri="infinispan://?cacheContainer=#cacheManager" />
<restConfiguration bindingMode="json" component="netty4-http"
enableCORS="true" port="7123" apiContextPath="/api-doc">
<dataFormatProperty key="prettyPrint" value="true" />
</restConfiguration>
<rest id="svc" path="">
<get id="getOp" uri="{cacheName}/{uid}">
<description>Get an entry with an ID from a cache</description>
<to uri="direct:getOp" />
</get>
<put id="putOp" uri="{cacheName}/{uid}" type="techlab.model.Event">
<description>Inserts an entry with the given ID and content in a cache</description>
<to uri="direct:putOp" />
</put>
</rest>
<!-- rest service to get an entry with the key -->
<route id="getOpRoute">
<from id="getOpStarter" uri="direct:getOp" />
<setHeader headerName="CamelInfinispanKey" id="getOpRouteSetKey">
<simple>${headers.uid}</simple>
</setHeader>
<setHeader headerName="CamelInfinispanCacheName" id="getOpRouteSetCacheName">
<simple>${headers.cacheName}</simple>
</setHeader>
<setHeader headerName="CamelInfinispanOperation" id="getOpRouteSetOperation">
<constant>CamelInfinispanOperationGet</constant>
</setHeader>
<to id="getOpRouteToDataGrid" uri="ref:datagrid" />
<setBody id="getOpRouteSetResponse">
<simple>${header.CamelInfinispanOperationResult}</simple>
</setBody>
</route>
<!-- rest service to put entries into a cache -->
<route id="putOpRoute">
<from id="putOpStarter" uri="direct:putOp" />
<setHeader headerName="CamelInfinispanKey" id="putOpRouteSetKey">
<simple>${headers.uid}</simple>
</setHeader>
<setHeader headerName="CamelInfinispanCacheName" id="putOpRouteSetCacheName">
<simple>${headers.cacheName}</simple>
</setHeader>
<setHeader headerName="CamelInfinispanOperation" id="putOpRouteSetOperation">
<constant>CamelInfinispanOperationPut</constant>
</setHeader>
<setHeader headerName="CamelInfinispanValue" id="putOpRouteSetValue">
<simple>${body}</simple>
</setHeader>
<to id="putOpRouteToDataGrid" uri="ref:datagrid" />
<setBody id="putOpRouteSetResponse">
<simple>Value inserted</simple>
</setBody>
</route>
</camelContext>
Define a rest service that allows to pass any http query parameters
(i.e http://localhost:7123/query/event/techlab.model.Event?timestmp=1462208399999&name=ended)
<get id="queryOp" uri="query/{cacheName}/{type}">
<description>Allows to query based on object fields using lucene search engine</description>
<to uri="direct:queryOp" />
</get>
Create classes that generate Data Grid Queries. Note that these classes are pretty generic as they use reflection and are suitable to any data model.
package techlab.dg;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.util.Date;
import java.util.Map;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
import org.infinispan.query.dsl.FilterConditionContext;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryBuilder;
import org.infinispan.query.dsl.QueryFactory;
public class GenericQuery implements InfinispanQueryBuilder {
private Map<String,Object> params;
private BeanInfo info;
private Class type;
public GenericQuery(String typeName, Map<String, Object> params) throws ClassNotFoundException, IntrospectionException {
super();
//inspect the searched class in order to get the fields that can be queried
type = Class.forName(typeName);
info = Introspector.getBeanInfo( type,Object.class);
this.params = params;
}
@Override
public Query build(QueryFactory<Query> queryFactory) {
QueryBuilder<Query> qb = queryFactory.from(type);
FilterConditionContext ctx=null;
// for each property of the class we look if a parameter has been set
for ( PropertyDescriptor pd : info.getPropertyDescriptors() ){
Object searchValue = this.params.get(pd.getName());
//only search the fields that are actually indexed by checking the presence of Field annotation
//only add search criteria when the parameter has been set in the header and when the property is indexed
if (searchValue!=null){
//if field is a date convert the type explicitly
if (pd.getPropertyType().equals(Date.class)){
searchValue = new Date(Long.parseLong((String)searchValue));
}
if (ctx==null){ //first condition
ctx = qb.having(pd.getName()).eq(searchValue);
}else{ //additional conditions with and operator
ctx.and().having(pd.getName()).eq(searchValue);
}
}
}
return qb.build();
}
}
package techlab.dg;
import java.beans.IntrospectionException;
import org.apache.camel.Exchange;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
public class GenerateQuery {
public InfinispanQueryBuilder getBuilder(Exchange ex) throws ClassNotFoundException, IntrospectionException {
InfinispanQueryBuilder qb = new GenericQuery(ex.getIn().getHeader("type",String.class),ex.getIn().getHeaders());
return qb;
}
}
Declare the beans, service endpoint and route in the Camel context
<beans>
...
<bean id="generateQuery" class="techlab.dg.GenerateQuery" />
...
<camelContext id="techlab-fuse-jdg-library-mode" xmlns="http://camel.apache.org/schema/spring">
...
<rest id="svc" path="" >
...
<get id="queryOp" uri="query/{cacheName}/{type}">
<description>Allows to query based on object fields using lucene search engine</description>
<to uri="direct:queryOp" />
</get>
</rest>
...
<!-- rest service to query caches with any indexed field -->
<route id="queryOpRoute">
<from id="queryOpStarter" uri="direct:queryOp" />
<log message="Query headers : ${headers}"></log>
<setHeader headerName="CamelInfinispanCacheName" id="queryOpRouteSetCacheName">
<simple>${headers.cacheName}</simple>
</setHeader>
<setHeader headerName="CamelInfinispanOperation" id="queryOpRouteSetOperation">
<constant>CamelInfinispanOperationQuery</constant>
</setHeader>
<setHeader headerName="CamelInfinispanQueryBuilder" id="queryOpRouteSetBuilder">
<method ref="generateQuery" method="getBuilder" />
</setHeader>
<to id="queryOpRouteToDataGrid" uri="ref:datagrid" />
<setBody id="queryOpRouteSetResponse">
<simple>${header.CamelInfinispanOperationResult}</simple>
</setBody>
</route>
</camelContext>
</beans>
Run the Fuse project on your developer machine
mvn clean package camel:run
Insert a few entries by running a curl command
curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "uid": "1", "timestmp": "2017-04-07T19:30:00.000Z", "name": "start", "content": "party started" }' 'http://localhost:7123/event/1' curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "uid": "2", "timestmp": "2017-04-07T22:15:00.000Z", "name": "incident", "content": "police arrived" }' 'http://localhost:7123/event/2' curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "uid": "3", "timestmp": "2017-04-07T23:18:00.000Z", "name": "incident", "content": "host arrested" }' 'http://localhost:7123/event/3' curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{ "uid": "4", "timestmp": "2017-04-07T23:20:00.000Z", "name": "end", "content": "party ended" }' 'http://localhost:7123/event/4'
List all events through a query without parameters
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event'
[ {
"uid" : "1",
"timestmp" : 1491593400000,
"name" : "start",
"content" : "party started"
}, {
"uid" : "2",
"timestmp" : 1491603300000,
"name" : "incident",
"content" : "police arrived"
}, {
"uid" : "3",
"timestmp" : 1491607080000,
"name" : "incident",
"content" : "host arrested"
}, {
"uid" : "4",
"timestmp" : 1491607200000,
"name" : "end",
"content" : "party ended"
}
List all incidents through a query with a parameter
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident'
[ {
"uid" : "2",
"timestmp" : 1491603300000,
"name" : "incident",
"content" : "police arrived"
}, {
"uid" : "3",
"timestmp" : 1491607080000,
"name" : "incident",
"content" : "host arrested"
} ]
List all incidents at a certain hour with 2 parameters
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident×tmp=1491607080000'
[ {
"uid" : "3",
"timestmp" : 1491607080000,
"name" : "incident",
"content" : "host arrested"
} ]
Alternatively you can also use swagger-ui to test the services
In the pom.xml file of the Fuse project, add dynamic import block to the maven-bundle-plugin.
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>${version.maven-bundle-plugin}</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>techlab-fuse-jdg-simple-hotrod</Bundle-SymbolicName>
<Bundle-Name>techlab-fuse-jdg-simple-hotrod</Bundle-Name>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
Generate bundle for deployment
mvn clean package
Connect to Fuse console and run these commands to install required dependencies
features:install camel-swagger-java camel-netty4-http camel-jackson features:addurl mvn:org.apache.camel/camel-jbossdatagrid/6.5.1.Final-redhat-1/xml/features features:install camel-jbossdatagrid
Install our Fuse project bundle
osgi:install -s file:<PATH_TO_PROJECT>/techlab-fuse-jdg-simple-hotrod/target/techlab-fuse-jdg-simple-hotrod-1.0.0-SNAPSHOT.jar
That's it, now we have our running Data Grid with persistence, indexes and we are able to access it in a Fuse Project.
Thanks for reading
Originally published April 07, 2017
Latest update April 06, 2017
Related posts :