(1)flume自带拦截器。
flume中的拦截器(interceptor),是source的拦截器,主要的作用就是对于一个source可以指定一个或者多个interceptors按先后的顺序对数据进行处理。
用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,比如在收集数据的event的hander中加入处理的时间戳,agent的主机或者IP,固定的key-value等,对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume目前提供了以下拦截器:
timestamp拦截器:在Event Header中添加时间戳。
Host拦截器:在Event Header中添加agent运行机器的Host或IP。
Static拦截器:在Event Header中添加自定义静态属性。
Remove Header拦截器:可移除Event Header中指定属性。
UUID拦截器:在Event Header中添加全局唯一UUID。
Search and Replace拦截器:基于正则搜索和替换字符串等。
Regex Filtering拦截器:基于正则过滤或反向过滤Event。
Regex Extractor拦截器:基于正则在Event Header添加指定的Key,并将匹配到的内容作为对应的Value。
(2)flume Interceptor接口。
面对实际复杂应用时,flume自带拦截器不能满足其需求。例如:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。此时,需要进行高阶开发自定义拦截器。在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。可利用Flume拓扑结构中的Multiplexing结构,将不同的event发送到不同的Channel中。Multiplexing的原理是根据event中Header的某个key的值,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。官方提供了自定义Interceptor的接口(https://flume.apache.org/FlumeUserGuide.html#flume-interceptors)。常用于实现自定义Interceptor的方法如下:
public void initialize()//运行前的初始化,一般不需要实现;
public Event intercept(Event event)//处理单个event;
public List intercept(List events)//批量处理event,内部调用单个的event逻辑即可;
public void close()//程序推出释放工作
public interface Builder extends Configurable//构建Interceptor对象,外部使用这个Builder来获取Interceptor对象。
(3)自定义拦截器实现。
本节我们以案例的形式说明如何实现自定义拦截器。
任务要求:以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。如图7-30所示。
图7-30 拦截器分发框架
具体实现步骤如下。
1)创建maven工程(maven工程创建过程见8.2.3节)。
2)导入pom依赖。
pom.xml的dependencies列表列出了我们的项目需要构建的所有外部依赖项。要添加依赖项,一般是先在src文件夹下添加lib文件夹,然后将工程需要的jar文件复制到lib文件夹下。这里我们使用的是org.apache.maven.plugins.jar包。pom.xml文件的完整代码如下。
3)编写拦截器类。
4)项目打包。
使用maven对工程进行打包,需要将mysql的依赖包一起打到jar包里,然后将打包好的jar包放到flume的lib目录下,如图7-31和图7-32所示。
图7-31 打包依赖包
图7-32 复制jar包
5)编写flume配置文件。
①flume1配置文件。
配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。具体信息如下:
# Name the components on this agent
a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2
# Describe/configure the source
a1.sources.r1.type=netcat
a1.sources.r1.bind=127.0.0.1
a1.sources.r1.port=4444
②flume2配置文件。
配置一个avro source和一个logger sink。
#拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=interceptor.CustomInterceptor$Builder(www.xing528.com)
#选择器
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=type
#与自定义拦截器中设置的头信息对应
a1.sources.r1.selector.mapping.letter=c1
a1.sources.r1.selector.mapping.number=c2
# Describe the sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=127.0.0.1
a1.sinks.k1.port=4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=127.0.0.1
a1.sinks.k2.port=4242
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
# Bind the source and sink to the channel
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
③flume3配置文件。
配置一个avro source和一个logger sink。
a3.sources=r1
a3.sinks=k1
a3.channels=c1
a3.sources.r1.type=avro
a3.sources.r1.bind=127.0.0.1
a3.sources.r1.port=4242
a3.sinks.k1.type=logger
a3.channels.c1.type=memory
a3.channels.c1.capacity=1000
a3.channels.c1.transactionCapacity=100
a3.sinks.k1.channel=c1
a3.sources.r1.channels=c1
6)测试。
①flume1需要连接flume2和flume3,因此,先启动flume2和flume3,若先启动flume1会报连接不上。执行下列shell命令,进行功能测试。
1.cd/usr/local/flume/usr/local/flume/bin/
2./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/
example.conf--name a3-Dflume.root.logger=INFO,console
3./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/
example.conf--name a2-Dflume.root.logger=INFO,console
4./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/
example.conf--name a1-Dflume.root.logger=INFO,console
②向监控端口发送数据。
1.nc 127.0.0.1 4444
2.qwer1234
③如图7-33所示,可以看到不同的内容被发送到不同的flume了,拦截器代码中只定义数字和小写字母,发送其他的内容不会被flume1转发。
图7-33 观察拦截结果
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。