Monday, July 15, 2013

Communicating with WSO2 CEP through Thrift in NodeJS + JAVA

In our  final year research project we use WSO2 CEP to analyze complex event coming through a JSON stream. That JSON stream is coming form a NodeJS application. In this article I am describing how to send the JSON stream to WSO2 CEP using Thrift.

Initially I have developed a JAVA Thrift client to send JSON events to WSO2 CEP. That can be found in here https://github.com/andunslg/Sith/tree/master/SithCEPPublisher.

SithCEPPublisher is the main communicator class,
package org.sith.cep.publisher;

import org.sith.cep.publisher.config.SithPerceptionPublisherStreamConfig;
import org.sith.cep.publisher.config.StreamConfig;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.exception.*;
import java.net.MalformedURLException;

public class SithCEPPublisher{

    private DataPublisher dataPublisher;
    private StreamConfig streamConfig;

    public SithCEPPublisher(String url, String userName, String password){

        //KeyStoreUtil.setTrustStoreParams();
        System.setProperty("javax.net.ssl.trustStore","wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

        System.out.println("Key Store is set..");
        //according to the convention the authentication port will be 7611+100= 7711 and its host will be the same

        try{
            dataPublisher = new DataPublisher(url, userName, password);
        }catch(MalformedURLException e){
            e.printStackTrace();
        }catch(AgentException e){
            e.printStackTrace();
        }catch(AuthenticationException e){
            e.printStackTrace();
        }catch(TransportException e){
            e.printStackTrace();
        }

        System.out.println("Logged in..");        
        this.streamConfig=new SithPerceptionPublisherStreamConfig();
        System.out.println("Default Stream Config Added..");
    }

    public SithCEPPublisher(String url, String userName, String password,String streamName, String streamVersion, String streamDefinition){

        //KeyStoreUtil.setTrustStoreParams();
        System.setProperty("javax.net.ssl.trustStore","wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

        System.out.println("Key Store is set..");
        //according to the convention the authentication port will be 7611+100= 7711 and its host will be the same

        try{
            dataPublisher = new DataPublisher(url, userName, password);
        }catch(MalformedURLException e){
            e.printStackTrace();
        }catch(AgentException e){
            e.printStackTrace();
        }catch(AuthenticationException e){
            e.printStackTrace();
        }catch(TransportException e){
            e.printStackTrace();
        }

        System.out.println("Logged in..");

        this.streamConfig=new StreamConfig(streamName,streamVersion,streamDefinition);
        System.out.println("Stream Config Added..");
    }

    public String publishToCEP(Object[] metaDataArray,Object[] payloadArray) {

        System.out.println("Starting publishing process..");
        String streamId;

        try {
            streamId = dataPublisher.findStream(streamConfig.getStreamName(), streamConfig.getStreamVersion());
        } catch (NoStreamDefinitionExistException e) {
            try{
                streamId = dataPublisher.defineStream(streamConfig.getStreamDefinition());
                System.out.println("Stream Definition created..");
            }catch(AgentException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(MalformedStreamDefinitionException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(StreamDefinitionException e1){
                e1.printStackTrace();
                return "Failed";
            }catch(DifferentStreamDefinitionAlreadyDefinedException e1){
                e1.printStackTrace();
                return "Failed";
            }

        }catch(StreamDefinitionException e){
            e.printStackTrace();
            return "Failed";
        }catch(AgentException e){
            e.printStackTrace();
            return "Failed";
        }

        System.out.println("Stream created..");

        try{
            //dataPublisher.publish(streamId, new Object[]{ipAddress}, null, new Object[]{eventID, userID, percetionValue});
            dataPublisher.publish(streamId, metaDataArray, null, payloadArray);
        }catch(AgentException e){
            e.printStackTrace();
            return "Failed";
        }
        System.out.println("Data published..");
        return "Done";
    }
}
The I have two configuration classes,

StreamConfig class which has the configuration of JSON stream,
package org.sith.cep.publisher.config;

public class StreamConfig{

    private String streamName;
    private String streamVersion;
    private String streamDefinition;

    public StreamConfig(String streamName, String streamVersion, String streamDefinition){
        this.streamName=streamName;
        this.streamVersion=streamVersion;
        this.streamDefinition=streamDefinition;
    }

    public String getStreamName(){
        return streamName;
    }

    public void setStreamName(String streamName){
        this.streamName=streamName;
    }

    public String getStreamVersion(){
        return streamVersion;
    }

    public void setStreamVersion(String streamVersion){
        this.streamVersion=streamVersion;
    }

    public String getStreamDefinition(){
        return streamDefinition;
    }

    public void setStreamDefinition(String streamDefinition){
        this.streamDefinition=streamDefinition;
    }
}


SithPerceptionPublisherStreamConfig class which holds the stream definition,

package org.sith.cep.publisher.config;

public class SithPerceptionPublisherStreamConfig extends StreamConfig{
    private static String sithPerceptionStreamName="sith_Perception_Analytic";
    private static String sithPerceptionStreamVersion="1.0.0";
    private static String sithPerceptionStreamDefinition=

                    "{"+
                    "    'name':'sith_Perception_Analytics',"+
                    "    'version':'1.0.0',"+
                    "    'nickName': 'Sith Analytics',"+
                    "    'description': 'Sith_Perception_Analytics',"+
                    "    'metaData':["+
                    "        {"+
                    "            'name':'ipAdd','type':'STRING'\n"+
                    "        }"+
                    "    ],"+
                    "    'payloadData':["+
                    "        {"+
                    "            'name':'eventID','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'userID','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'perceptionValue','type':'STRING'"+
                    "        },"+
                    "        {"+
                    "            'name':'comment','type':'STRING'"+
                    "        }"+
                    "    ]"+
                    "}";

    public SithPerceptionPublisherStreamConfig(){
        super(sithPerceptionStreamName,sithPerceptionStreamVersion,sithPerceptionStreamDefinition);
    }

}

Then what I have did was invoking this JAVA code using nodejs application. To do that I have used this nodejs module which provides cross language integration. https://github.com/nearinfinity/node-java. The node JS appication is given below,
exports.publishComment = function(req,res){
    percepManager.publishToCEP(req.body.userID , req.body.eventID , req.body.perceptionValue,req.body.text);
    res.writeHead(200, {'Content-Type': 'application/json'});
      var result = JSON.stringify({response: true });
    res.write(result);
    res.end();
}

var java = require("java");
java.classpath.push("cep-publisher-1.0.jar");
var jClass = java.newInstanceSync("org.sith.cep.publisher.SithCEPPublisher","tcp://192.248.8.246:7611","admin","apst@sith");

exports.publishToCEP = function(userID,eventID,perceptionVal,comment) {
    var metaDataArray = java.newArray("java.lang.Object", ["192.248.8.246"]);
    var payloadArray = java.newArray("java.lang.Object", [eventID,userID,perceptionVal,comment]);
    var result=jClass.publishToCEPSync(metaDataArray,payloadArray);
    console.log("Returned data - "+result);
} 
To run this application what you have to do is run npm install java and then run node app command. After that you can call this method to send the defined JSON stream to CEP.