Esper primer

Recently, feeling the need for more complex (no pun intended) event processing than what a simple enterprise integration patterns implementation such as Spring Integration provides, I started looking at Esper for more insight into CEP and ESP but from an open source perspective.

Here I reproduce an example that I followed from the Esper documentation. Note that this is the exact example from the Esper tutorial with javadoc also taken from the Esper documentation which I take no credit for. However I reproduce it here as a complete and coherent example for my own reference and anyone else’s and also with one or two fixes to mistakes in the Esper documentation.

Create an event using a java pojo class.

package esper.example;

import java.text.MessageFormat;

/**
 * Java classes are a good choice for representing events, however Map-based or
 * XML event representations can also be good choices depending on your
 * architectural requirements.
 *
 * A sample Java class that represents an order event is shown below. A simple
 * plain-old Java class that provides getter-methods for access to event
 * properties works best:
 */
public class OrderEvent {

    private String itemName;
    private double price;

    public OrderEvent(String itemName, double price) {
        this.itemName = itemName;
        this.price = price;
    }

    public String getItemName() {
        return itemName;
    }

    public double getPrice() {
        return price;
    }

    public String toString() {
        return MessageFormat.format("OrderEvent [itemName={0}, price={1}]", new Object[] {
                itemName, new Double(price) });
    }

}

Create a main method that will invoke the application by creating a statement, assigning a listener to that statement and publishing some events.

package esper.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;

public class Main {

    final static Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {

        /*
         * Esper runs out of the box and no configuration is required. However
         * configuration can help make statements more readable and provides the
         * opportunity to plug-in extensions and to configure relational
         * database access.
         *
         * One useful configuration item specifies Java package names from which
         * to take event classes.
         *
         * This snippet of using the configuration API makes the Java package of
         * the OrderEvent class known to an engine instance:
         */
        Configuration config = new Configuration();
        config.addEventTypeAutoName(OrderEvent.class.getPackage().getName());

        /*
         * A statement is a continuous query registered with an Esper engine
         * instance that provides results to listeners as new data arrives, in
         * real-time, or by demand via the iterator (pull) API.
         *
         * The next code snippet obtains an engine instance and registers a
         * continuous query. The query returns the average price over all
         * OrderEvent events that arrived in the last 30 seconds:
         */
        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
        String expression = "select itemName, avg(price) from OrderEvent.win:time(30 sec)";
        EPStatement statement = epService.getEPAdministrator().createEPL(expression);

        /*
         * By attaching the listener to the statement the engine provides the
         * statement's results to the listener:
         */
        MyListener listener = new MyListener();
        statement.addListener(listener);

        /*
         * The runtime API accepts events for processing. As a statement's
         * results change, the engine indicates the new results to listeners
         * right when the events are processed by the engine.
         *
         * Sending events is straightforward as well:
         */
        OrderEvent event1 = new OrderEvent("a", 1);
        OrderEvent event2 = new OrderEvent("b", 2);
        OrderEvent event3 = new OrderEvent("c", 3);
        OrderEvent event4 = new OrderEvent("d", 4);

        EPRuntime epRuntime = epService.getEPRuntime();
        epRuntime.sendEvent(event1);
        epRuntime.sendEvent(event2);
        epRuntime.sendEvent(event3);
        epRuntime.sendEvent(event4);

    }

}

Create a listener that will receive events in real time.

package esper.example;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Listeners are invoked by the engine in response to one or more events that
 * change a statement's result set. Listeners implement the UpdateListener
 * interface and act on EventBean instances as the next code snippet outlines:
 */
public class MyListener implements UpdateListener {

    final static Logger logger = LoggerFactory.getLogger(UpdateListener.class);

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        EventBean event = newEvents[0];
        Object average = event.get("avg(price)");
        Object itemName = ((Map) event.getUnderlying()).get("itemName");
        logger.info("upon arrival of {} the average stood at {}", itemName, average);
    }

}

This produces the output below as expected.

22:34:21.296 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of a the average stood at 1.0
22:34:21.300 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of b the average stood at 1.5
22:34:21.301 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of c the average stood at 2.0
22:34:21.301 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of d the average stood at 2.5

As a bonus if you want to load events from a csv file that can be done as follows.

Create a csv file with column headings and call it, let’s say, simulation.csv.

itemName,price
a,1
b,2
c,3
d,4

Load and publish events directly from the csv file. Awesome.

package esper.example;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esperio.AdapterInputSource;
import com.espertech.esperio.InputAdapter;
import com.espertech.esperio.csv.CSVInputAdapter;
import com.espertech.esperio.csv.CSVInputAdapterSpec;

public class Main {

    final static Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {

        Configuration config = new Configuration();
        config.addEventTypeAutoName("esper.example");

        /*
         * declare event type map
         */
        Map typeMap = new HashMap();
        typeMap.put("itemName", String.class);
        typeMap.put("price", double.class);
        config.addEventType("OrderEvent", typeMap);

        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
        String expression = "select itemName, avg(price) from OrderEvent.win:time(30 sec)";
        EPStatement statement = epService.getEPAdministrator().createEPL(expression);

        MyListener listener = new MyListener();
        statement.addListener(listener);

        /*
         * publish events from csv file
         */
        AdapterInputSource adapterInputSource = new AdapterInputSource("simulation.csv");
        (new CSVInputAdapter(epService, adapterInputSource, "OrderEvent")).start();

    }

}

This gets a lot more sophisticated with looping and throttling of the adapter source.

Esper, on first impressions, seems like a very powerful tool for event processing with its event processing language (EPL) modelled on SQL and also very lightweight in that it is simply a jar that can be deployed within your application without any further infrastructure. Being new to CEP and Esper I’m very excited indeed by the prospects it opens up for me and I definitely intend to continue looking further into CEP and ESP academically using Esper to see what else is possible.

What will also be interesting going forward is the experience of gradually porting the more complex use cases I had for Spring Integration to using Esper – although I do understand that they are not an exact match – one being a CEP/ESP solution and another being an EIP/inversion of concurrency solution.

21 thoughts on “Esper primer

  1. hi dhruba ! Thank you for your examples on Esper. I am tried your example; The first one worked fine. But when I use “simulation.csv” file I am getting a run – time exception like this:

    Exception in thread "main" com.espertech.esper.client.EPException: C:UserslincolnworkspacecsvInputbinespersimulation.csv not found
    	at com.espertech.esperio.AdapterInputSource.resolvePathAsStream(AdapterInputSource.java:198)
    	at com.espertech.esperio.AdapterInputSource.getAsStream(AdapterInputSource.java:157)
    	at com.espertech.esperio.csv.CSVSource.(CSVSource.java:36)
    	at com.espertech.esperio.csv.CSVReader.(CSVReader.java:53)
    	at com.espertech.esperio.csv.CSVInputAdapter.finishInitialization(CSVInputAdapter.java:213)
    	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:64)
    	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:76)
    	at esper.Lincoln.main(Lincoln.java:120)
    

    I tried changing the patch name and used absolute path name for the file, but still don’t work. It would be very nice if you can give me any suggestion regarding this.

  2. Dear dhruba and Md. Abdus Salam

    I am having issues with running first example. I tried to run from eclipse but it just get terminated.

    Can u pls assist me here.
    Many thanks &
    Regards
    Adnan

  3. hi adnan ! the first example worked well for me. I used eclipse. just copied and paste the code in respective files named on the class name. what sort of problem are u facing?

    1. Dear Abdus Salam

      Thanks for your kind reply. Yes my first one is working fine now. Did your program work when u tried to load from the csv file ?

      Regards
      Adnan

      1. Hi
        I am getting the same error as file not found. Tried few options by giving obsolute path but still having the same error. Any suggion would be greatly appreciated.

        log4j:WARN Please initialize the log4j system properly.
        Exception in thread "main" com.espertech.esper.client.EPException: test.csv not found
        	at com.espertech.esperio.AdapterInputSource.resolvePathAsStream(AdapterInputSource.java:198)
        	at com.espertech.esperio.AdapterInputSource.getAsStream(AdapterInputSource.java:157)
        	at com.espertech.esperio.csv.CSVSource.(CSVSource.java:36)
        	at com.espertech.esperio.csv.CSVReader.(CSVReader.java:53)
        	at com.espertech.esperio.csv.CSVInputAdapter.finishInitialization(CSVInputAdapter.java:213)
        	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:64)
        	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:76)
        
      2. hi!

        I did the following to overcome this problem

        File inFile = new File("test.csv");
        AdapterInputSource adapterInputSource = new AdapterInputSource(inFile);
        

        try it and I’m sure u will get output of the second example

  4. Thanks Abdus Salam. It worked .I tried using absolute path from root and also using your comment and it did work in both cases now.

    Once again appreciate your kind reply.

  5. Hi Abdus salam

    I hope u r fine. I am just wondering if u have tried to save the events in csv file. Any suggision how we can do that?

    Regards
    Adnan

  6. Hi Dhruba Bandopadhyay,

    I am using EPSER for CEP. I had done RD and able to upload CVS file in Adapter. I had written mulitple queries and they are firing against the data from CVS thru polling.

    The problem is Esper didnt gave examples to write some block of EPL queries using case statements or EPL blocks similar to PL/SQL block. I need to update multiple streams based on data in one particular stream. I am using custom Utility classes in queries to simply business logic. Is there any way that I can pass stream to the Custome classes so that i
    can modify the stream in Utility classes itself.

    Pl reply….

    Regards
    Sudhakar Reddy.

  7. Hello
    I am new to esper and ecllipse.
    Can you tell me how to run these programs whether on ecllipse or esper.
    When I simply compile your programs using java in command prompt it says an error because it cant find espertech.esper.client*
    So please throw some light on how to start running these programs from scratch.
    I will be highly obliged if you help me out.

    1. hi Anshul..
      even me too have same problem of how to run esper programs .even i tried in java ,but getting same error as it cant find espertech.esper.client.*
      it has been two years you have posted this,if you got solution for that problem,kindly help me in running those programs.waitng for your valuable reply.

  8. Hello
    I am new to esper and ecllipse.
    Can you tell me how to run these programs whether on ecllipse.
    So please throw some light on how to start running these programs from scratch.
    I will be highly obliged if you help me out.

  9. Hi! I’m trying to run the first part but I’m getting this type of error. Could you please help me?

    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
    at com.espertech.esper.client.Configuration.(Configuration.java:39)
    at Main.main(Main.java:16)
    Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    … 2 more

    1. hello alex,
      Iam the beginner of esper.i dont know how to run esper programs in java.i simply runned them in command prompt using java.but iam getting the same errors which you have got.kindly help me in running esper programs pls.Mail me those who knows solution to this problem at sushma.030894@gmail.com .i will be highly obliged if any one helps me for this

  10. I had to import commons-logging.jar and now I have another error 😦
    Exception in thread “main” java.lang.NullPointerException
    at Main.main(Main.java:29)

  11. Very well described and elaborated. Thank you for your support and help. I tried with the same code and did same in the eclipse, after resolving dependencies,on execution I got this error,
    Failed to instantiate SLF4J LoggerFactory
    Reported exception:
    java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:128)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:107)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:295)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:269)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:281)
    at Main.(Main.java:17)
    Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    … 6 more
    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.slf4j.LoggerFactory.bind(LoggerFactory.java:128)
    at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:107)
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:295)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:269)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:281)
    at Main.(Main.java:17)
    Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    … 6 more

    I need your help in resolving this issue what is this error and how to resolve it.Thank you and keen to hear from you.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s