流式任务与简单任务不同的地方是此任务的逻辑实现类要继承DataflowJob接口类,此接口类中包含两个方法:fetchData和processData。fetchData方法负责数据抓取,processData方法负责对抓取到的数据进行处理。当流式处理数据时,只有当fetchData方法返回null或空List时作业才停止,否则作业会一直执行下去。
流式任务比较适用于在某一时刻,利用流式任务不停执行的特性,进行不间断的大量数据处理的工作。下面编写一个例子,让流式任务在每天2点统计前一天每个用户的购买情况。
(1)时间及分片设置
在yml文件中添加如下内容。
使用此配置,设定任务总体分片数为3,每天2点执行任务。
(2)添加配置类
此类根据之前设置的配置项和新创建的定时任务对象启动调度执行。
(3)定义定时任务执行逻辑
SpringDataflowJob类定义了定时任务执行的逻辑,其中fetchData方法用于分批次从数据库中查询前一天的订单列表;processData方法根据查询到的数据进行统计,代码中的注释部分“todo sth;”表示可以把统计数据存入另外一个统计表中,这里囿于篇幅没有截取代码,可以到随书附带的工程中查看。processData方法最后调用dao.completeStatis(data)方法,用于对已经处理完的数据设置标记位,避免再次被fetchData方法查询出已经处理过的数据。
(4)数据查询及处理(https://www.xing528.com)
在OrderDaoImpl中添加如下方法,getStatisList方法负责查询用户的未统计订单,completeStatis方法用于对已经统计完的数据设置标记位。
㊀此mapper是使用Mybatis自动生成的mapper。
(5)自定义的mapper
在查询未统计订单时,使用了自定义的SQL查询方法,代码如下。
㊀此处用于演示,所以没有对订单的支付状态进行筛选,实际业务中的查询逻辑要比这个复杂;BaseResultMap的定义和Mybatis自动生成的相同,记得将Mybatis自动生成的BaseResultMap的定义引入进此文件。
自定义Mapper类映射的接口方法为:
public List<Long>getStatisUser(Map<Object,Object>map);
public List<OrderJob>getStatisOrder(Map<Object,Object>map);
至此,使用DataflowJob形式的定时任务编写完成,它会在每天2点统计前一天的业务数据,可以让你在第二天上班时看到前一天详细的业务情况,给业务的发展以数据的支持。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。
