花生肉泥 Blog

进程模型

2016/09/26 Share

1、UpdateListener

是Esper提供的一个借口,用于监听某个EPL在引擎中的运行情况,即事件进入并产生结果后会通知UpdateListener

1
2
3
4
5
6
7
8
9
import com.espertech.esper.client.EventBean;

public interface UpdateListener
{
public void update(EventBean[] newEvents, EventBean[] oldEvents)
{
/*code...*/
}
}

接口就一个update方法,包括两个EventBean数组。EventBean中有一个最常用的get方法,用来得到EPL中某个字段的值。

1
2
3
4
5
6
7
8
EPL:select name from User
// 假设newEvents长度为一
newEvents[0].get("name") // 能得到进入User时间的name的属性值

EPL: select count(*) from user.win:time(5 sec)
// 假设newEvents长度为一
newEvents[0].get("count(*)") // 能得到5秒内进入引擎的User事件数量有多少
get方法最常用,此外还有geyUnderlying等方法

2、Insert and Remove Stream

insert表示进入引擎,remove表示移出引擎,事件在Esper中会因为某类EPL才会经历这两种状态,对应于UpdateListener借口就是newEvents和oldEvents,因为处于这两种状态的事件不一定只有一个,所以newEvents和oldEvents就是数组形式

1
EPL: select * from User

paste image
从图可以看出,随着时间推移,每个进入引擎的事件W都是newEvents,即Insert Stream(事件W后括号里的值为属性值 )
那为什么oldEvents什么都没有,因为EPL的关系

1
EOL:select * from User.win:length(5)

paste image
注:win:length(5)是个view,大概意思是Esper开放一个空间并最多可同时存放5个事件(此空间就是大小为5的数组)
由图可知,length window可存放w1,w2等事件,在W6事件进入之前,每个事件进入都属于newEvents,知道W6进入后,length window不能容纳w1-w6的事件,必须把w1事件移出,即w1为oldEvents。length window就像一个队列,每当事件进入队列时,就会触发UpdateListener并告知有新事件进入。当队列满了,再进入一个新事件时,Esper会触发updateListener告知所有新事件进入并且有旧事件移出。

实际上EPL触发监听器都只能看到newEvents,看不到oldEvents。如果想看,EPL要改一下:

1
EPL:select irstream * from User.win:length(5)

默认情况下,Esper认为你只想让newEvents触发监听器,即istream(insert stream)。如果想让oldEvents触发监听器,那么为rstream(remove stream)。如果两个都想,那么为irstream。当然这个默认情况是可以配置的,以后会说到这个问题。

3、Filter and Where-Clause

EPL有两种过滤事件的方式,一种是过滤事件进去view,即Filter。另一种是让事件进入view,但不触发UpdateListener,即where字句。
Filter:

1
2
// Apple事件进入Esper,只有amount大于200才能进入win:length,并且length长度为5
EPL:select * from Apple(amount>200).win:length(5)

paste image
从上图看出,只有amount大于200,Esper才允许Apple事件进入view,并且作为一个newEvents触发UpdateListener

Where-Clause:

1
2
// Apple事件进入Esper并进入win:length(5),但只有amount大于200才能触发UpdateListener
EPL:select * from Apple.win:length(5) where amount>200

paste image

4、Aggregation and Grouping

EPL是类SQL语句,所以也有聚合和分组功能,语法和SQL基本一样

1
2
3
4
5
6
7
8
// 统计进入的5个Apple事件,amount的总数是多少
select sum(amount) from Apple.win:length_batch(5)

// 统计进入的5个Apple事件,amount的总数是多少,并按照price分组
select price,sum(amount) from Apple.win:length_batch(5) group by price

// 统计进入的5个Apple事件,amount的总数和name,并按照price分组
select price,name,sum(amount) from Apple.win:length_batch(5) group by price

CATALOG
  1. 1. 1、UpdateListener
  2. 2. 2、Insert and Remove Stream
  3. 3. 3、Filter and Where-Clause
  4. 4. 4、Aggregation and Grouping