花生肉泥 Blog

Esper Demo

2016/09/26 Share
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.zx;
import com.espertech.esper.client.*;
import java.util.Random;
import java.util.Date;

public class test1 {

/*一个javabean来描述给定时间的某个股票的价格
有三个属性: 股票代码、价格和时间戳
通过在实例化CEP引擎时,使用一个Configuration对象来实现*/
public static class Tick {
String symbol;
Double price;
Date timeStamp;
public Tick(String s, double p, long t) {
symbol = s;
price = p;
timeStamp = new Date(t);
}
// public double getPrice() {return price;}
// public String getSymbol() {return symbol;}
// public Date getTimeStamp() {return timeStamp;}

public String getSymbol() {
return symbol;
}

public Double getPrice() {
return price;
}

public Date getTimeStamp() {
return timeStamp;
}

@Override
public String toString() {
return "价格:" + price .toString() + " 时间:" + timeStamp.toString();
}
}

/*创建一个函数来随机生成数据,并丢到CEP引擎中去,我们把这个函数叫做"GennerateRandomTick",它把EPRuntime对象作为参数,这个对象用于把事件传递给CEP引擎*/
/*定义随机类*/
private static Random generator = new Random();

public static void GenerateRandomTick(EPRuntime cepRT) {

/*返回[0,10)集合中的整数,不包括10*/
double price = ( double) generator.nextInt(10);
/*毫秒为单位*/
long timeStamp = System. currentTimeMillis();
String symbol = "AAPL";
Tick tick = new Tick(symbol, price, timeStamp);
System. out.println( "发送tick:" + tick);
/*引擎在EPRuntime接口中通过SendEvent(DataMap map,String eventTypeName)方法来执行Map事件*/
cepRT.sendEvent(tick);

}

/*简单地把从引擎中接收到的对象打印出来*/
public static class CEPListener implements UpdateListener {

public void update(EventBean[] newData, EventBean[] oldData) {
System. out.println( "[触发事件: " + newData[0].getUnderlying() + "]");
}
}

public static void main(String[] args) {

/*配置初始化对象*/
Configuration cepConfig = new Configuration();
/*使用AddEventType方法配置在初始化或运行时定义的类型*/
cepConfig.addEventType( "StockTick", Tick. class.getName());

EPServiceProvider cep = EPServiceProviderManager.getProvider( "myCEPEngine", cepConfig);
/*获取Esper运行时*/
EPRuntime cepRT = cep.getEPRuntime();


/*现在,我们有了一个可以工作的CEP引擎,和不断输入的虚假的数据,是时候来创建我
* 们的第一条规则了,用 Esper的说法,我们的第一条EPL语句。要这么做,我们需要请
* 引擎的管理员记录我们的语句。然后,CEP引擎会根据EPL语句的定义,过滤它收到的
* 数据,当数据满足语句中的选择条件或者模式时,触发事件。*/

/*注册引擎*/
EPAdministrator cepAdm = cep.getEPAdministrator();
/*创建EPL语句
* 每当最近的2次数据的平均值大于6.0时,触发事件。*/
EPStatement cepStatement = cepAdm.createEPL( "select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0" );

/*创建一个监听者并把它和我们选择规则产生的时间联系起来*/
cepStatement.addListener( new CEPListener());

// 生成一些数据
for (int i = 0; i < 20; i++) {
GenerateRandomTick(cepRT);
}
}
}
CATALOG