首页 理论教育 如何创建自定义拦截器?

如何创建自定义拦截器?

时间:2023-06-26 理论教育 版权反馈
【摘要】:flume中的拦截器,是source的拦截器,主要的作用就是对于一个source可以指定一个或者多个interceptors按先后的顺序对数据进行处理。UUID拦截器:在Event Header中添加全局唯一UUID。面对实际复杂应用时,flume自带拦截器不能满足其需求。此时,需要进行高阶开发自定义拦截器。本节我们以案例的形式说明如何实现自定义拦截器。图7-30拦截器分发框架具体实现步骤如下。3)编写拦截器类。图7-31打包依赖包图7-32复制jar包5)编写flume配置文件。

如何创建自定义拦截器?

(1)flume自带拦截器。

flume中的拦截器(interceptor),是source的拦截器,主要的作用就是对于一个source可以指定一个或者多个interceptors按先后的顺序对数据进行处理。

用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,比如在收集数据的event的hander中加入处理的时间戳,agent的主机或者IP,固定的key-value等,对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume目前提供了以下拦截器:

timestamp拦截器:在Event Header中添加时间戳。

Host拦截器:在Event Header中添加agent运行机器的Host或IP。

Static拦截器:在Event Header中添加自定义静态属性。

Remove Header拦截器:可移除Event Header中指定属性。

UUID拦截器:在Event Header中添加全局唯一UUID。

Search and Replace拦截器:基于正则搜索和替换字符串等。

Regex Filtering拦截器:基于正则过滤或反向过滤Event。

Regex Extractor拦截器:基于正则在Event Header添加指定的Key,并将匹配到的内容作为对应的Value。

(2)flume Interceptor接口

面对实际复杂应用时,flume自带拦截器不能满足其需求。例如:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。此时,需要进行高阶开发自定义拦截器。在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。可利用Flume拓扑结构中的Multiplexing结构,将不同的event发送到不同的Channel中。Multiplexing的原理是根据event中Header的某个key的值,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。官方提供了自定义Interceptor的接口(https://flume.apache.org/FlumeUserGuide.html#flume-interceptors)。常用于实现自定义Interceptor的方法如下:

public void initialize()//运行前的初始化,一般不需要实现;

public Event intercept(Event event)//处理单个event;

public List intercept(List events)//批量处理event,内部调用单个的event逻辑即可;

public void close()//程序推出释放工作

public interface Builder extends Configurable//构建Interceptor对象,外部使用这个Builder来获取Interceptor对象。

(3)自定义拦截器实现。

本节我们以案例的形式说明如何实现自定义拦截器。

任务要求:以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。如图7-30所示。

图7-30 拦截器分发框架

具体实现步骤如下。

1)创建maven工程(maven工程创建过程见8.2.3节)。

2)导入pom依赖。

pom.xml的dependencies列表列出了我们的项目需要构建的所有外部依赖项。要添加依赖项,一般是先在src文件夹下添加lib文件夹,然后将工程需要的jar文件复制到lib文件夹下。这里我们使用的是org.apache.maven.plugins.jar包。pom.xml文件的完整代码如下。

3)编写拦截器类。

4)项目打包。

使用maven对工程进行打包,需要将mysql的依赖包一起打到jar包里,然后将打包好的jar包放到flume的lib目录下,如图7-31和图7-32所示。

图7-31 打包依赖包

图7-32 复制jar包

5)编写flume配置文件。

①flume1配置文件。

配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。具体信息如下:

# Name the components on this agent

a1.sources=r1

a1.sinks=k1 k2

a1.channels=c1 c2

# Describe/configure the source

a1.sources.r1.type=netcat

a1.sources.r1.bind=127.0.0.1

a1.sources.r1.port=4444

②flume2配置文件。

配置一个avro source和一个logger sink。

#拦截器

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=interceptor.CustomInterceptor$Builder(www.xing528.com)

#选择器

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=type

#与自定义拦截器中设置的头信息对应

a1.sources.r1.selector.mapping.letter=c1

a1.sources.r1.selector.mapping.number=c2

# Describe the sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=127.0.0.1

a1.sinks.k1.port=4141

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=127.0.0.1

a1.sinks.k2.port=4242

# Use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=100

# Bind the source and sink to the channel

a1.sources.r1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2

③flume3配置文件。

配置一个avro source和一个logger sink。

a3.sources=r1

a3.sinks=k1

a3.channels=c1

a3.sources.r1.type=avro

a3.sources.r1.bind=127.0.0.1

a3.sources.r1.port=4242

a3.sinks.k1.type=logger

a3.channels.c1.type=memory

a3.channels.c1.capacity=1000

a3.channels.c1.transactionCapacity=100

a3.sinks.k1.channel=c1

a3.sources.r1.channels=c1

6)测试。

①flume1需要连接flume2和flume3,因此,先启动flume2和flume3,若先启动flume1会报连接不上。执行下列shell命令,进行功能测试

1.cd/usr/local/flume/usr/local/flume/bin/

2./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/

example.conf--name a3-Dflume.root.logger=INFO,console

3./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/

example.conf--name a2-Dflume.root.logger=INFO,console

4./usr/local/flume/bin/flume-ng agent--conf./conf--conf-file./conf/

example.conf--name a1-Dflume.root.logger=INFO,console

②向监控端口发送数据。

1.nc 127.0.0.1 4444

2.qwer1234

③如图7-33所示,可以看到不同的内容被发送到不同的flume了,拦截器代码中只定义数字和小写字母,发送其他的内容不会被flume1转发。

图7-33 观察拦截结果

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈