Friday, November 21, 2014

Pin It


Get Gadget

Deploy and Schedule Hive Analytic using an API in WSO2 Business Activity Monitor

WSO2 Business Activity Monitor can be use to run Apache Hadoop map-reduce jobs using Apache Hive queries. If this process is integrated to your application's business logic, you need to create and run Hive queries on the fly. This can be done using and API. WSO2 Business Activity Monitor exposes a admin SOAP API to deploy Hive analytics and schedule them. In this post I am explaining how to use that service in JAVA application.

The WSDL of the HiveScriptStoreService can be found  https://BAM_Host:9443/services/HiveScriptStoreService?wsdl. This is used to do our task. To view this WSDL, you need to allow admin services WSDL to public using carbon.xml. In HiveScriptStoreService, we need to use saveHiveScript method. The SOAP request is like this,

<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope" xmlns:xsd="http://org.apache.axis2/xsd">
   <soap:Header/>
   <soap:Body>
      <xsd:saveHiveScript>
         <xsd:scriptName>?</xsd:scriptName>
         <xsd:scriptContent>?</xsd:scriptContent>
         <xsd:cron>?</xsd:cron>
      </xsd:saveHiveScript>
   </soap:Body>
</soap:Envelope>
ScriptContent is the actual Hive Query. Cron is the script execution scheduling parameter.  That is given in the CRON  style.

Lets see how to do this in JAVA. We are using the org.wso2.carbon.analytics.hive.stub to call the SOAP service. This comes with WSO2 Business Activity Monitor.

import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.transport.http.HttpTransportProperties;
import org.apache.axis2.AxisFault;
import org.wso2.carbon.analytics.hive.stub.HiveScriptStoreServiceHiveScriptStoreException;
import org.wso2.carbon.analytics.hive.stub.HiveScriptStoreServiceStub;
import java.rmi.RemoteException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class SaveHiveScript {
    
    public static void main(String[] args) {
        CronExpressionBuilder cronBuilder=null;
        HiveScriptStoreServiceStub stub=null;
        HttpTransportProperties.Authenticator authenticator = null;
        Map properties=null;

        cronBuilder = CronExpressionBuilder.getInstance();
        String serviceURL = Config.bamURL + "/services/HiveScriptStoreService";
        try {
            stub = new HiveScriptStoreServiceStub(null, serviceURL);
            ServiceClient client = stub._getServiceClient();
            Options options = client.getOptions();

            authenticator = new HttpTransportProperties.Authenticator();
            authenticator.setUsername(Config.bamAdminUserName);
            authenticator.setPassword(Config.bamAdminPassword);

            properties = new Properties();
            properties.put(org.apache.axis2.transport.http.HTTPConstants.AUTHENTICATE, authenticator);

            options.setProperties(properties);
        } catch (AxisFault axisFault) {
            System.out.println(axisFault.getMessage());
            System.out.println(axisFault.getDetail().toString());
        }

        String yearSelected="All";
        String monthSelected="All";
        String selectDay="selectDayMonth";
        String dayMonthSelected="All";
        String dayWeekSelected="All";
        String hoursSelected="1";
        String minutesSelected="0";

        String scriptName="TestAnalytic";
        String scriptContent="ExampleQueryGoesHere";

            HashMap<String, String> cronVals = new HashMap<String, String>();
            cronVals.put(CronConstants.YEAR, yearSelected);
            cronVals.put(CronConstants.MONTH, monthSelected);
            if (selectDay.equalsIgnoreCase("selectDayMonth")) {
                cronVals.put(CronConstants.DAY_OF_MONTH, dayMonthSelected);
            } else {
                cronVals.put(CronConstants.DAY_OF_WEEK, dayWeekSelected);
            }
            cronVals.put(CronConstants.HOURS, hoursSelected);
            cronVals.put(CronConstants.MINUTES, minutesSelected);

            String cronExpression = cronBuilder.getCronExpression(cronVals);

            if(stub!=null){
                try {
                    stub.saveHiveScript(scriptName, scriptContent, cronExpression);
                } catch (HiveScriptStoreServiceHiveScriptStoreException e) {
                    System.out.println(e.getMessage());
                } catch (RemoteException e) {
                    System.out.println(e.getMessage());
                }
            }else{
                System.out.println("Internal Error - Stub Creation Failed");
            }     
    }
}

To build CRON expression I used this class,

import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class CronExpressionBuilder {

    private static CronExpressionBuilder instance = new CronExpressionBuilder();

    public static CronExpressionBuilder getInstance(){
        return instance;
    }

    public String getCronExpression(HashMap<String, String> cronValues) {
        String dateCron = getDate(cronValues);
        String timeCron = getTime(cronValues, dateCron);
        return timeCron + " " + dateCron;
    }

    private String getTime(HashMap<String, String> cronValues, String dateCron) {
        String cronMinute = getCronText(cronValues.get(CronConstants.MINUTES));
        String cronHour = getCronText(cronValues.get(CronConstants.HOURS));
        String tempCron = cronMinute + " " + cronHour + " " + dateCron;
        String cronSec = "";
        boolean isNumberExists = false;
        Pattern pattern = Pattern.compile("\\d");
        Matcher matcher = pattern.matcher(tempCron);
        if (matcher.find()){
            isNumberExists = true;
        }
        if (!isNumberExists) {
            cronSec = "1";
        } else {
            cronSec = "0";
        }
        return cronSec + " " + cronMinute + " " + cronHour;

    }

    private String getCronText(String text) {
        if (text.equalsIgnoreCase("All")) {
            return "*";
        }
        return text;
    }

    private String getDate(HashMap<String, String> cronValues) {
        String dayMonth = cronValues.get(CronConstants.DAY_OF_MONTH);
        String cronDayMonth = "";
        String cronDayWeek = "";
        String cronMonth = "";
        String cronYear = "";
        if (null != dayMonth && !dayMonth.equals("")) {
            cronDayMonth = getCronText(dayMonth);
            cronDayWeek = "?";
        } else {
            //dayWeek should be set here
            String dayWeek = cronValues.get(CronConstants.DAY_OF_WEEK);
            cronDayWeek = getCronText(dayWeek);
            cronDayMonth = "?";
        }
        cronMonth = getCronText(cronValues.get(CronConstants.MONTH));
        cronYear = getCronText(cronValues.get(CronConstants.YEAR));
        String dateCron = cronDayMonth + " " + cronMonth + " " + cronDayWeek + " " + cronYear;
        return dateCron;
    }
}


Using these classes you can easily deploy an analytic to WSO2 Business Activity Monitor.

No comments:

Post a Comment