(1)背景介绍
Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 能够对数据进行简单处理,并写到各自数据接收方。Flume 有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor 等,通过使用不同的拦截器,实现不同的功能。但是,以上这些拦截器,不能改变原有日志数据的内容,或者对日志信息添加一定的处理逻辑。
(2)自定义拦截器
根据实际业务的需求,为了更好地满足数据在应用层的处理,通过自定义Flume 拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。
(3)案例实现
1)自定义拦截器
编写Java 代码,其内容包括:
①定义一个类CustomParameterInterceptor 实现Interceptor 接口。在CustomParameterInterceptor 类中定义变量,这些变量是需要到Flume 的配置文件中进行配置使用的。
②添加CustomParameterInterceptor 的有参构造方法,并对相应的变量进行处理。将配置文件中传过来的unicode 编码转换为字符串。(www.xing528.com)
③写具体的要处理的逻辑Intercept()方法,一个是单个处理,另一个是批量处理。
④接口中定义了一个内部接口Builder,在configure 方法中,进行一些参数配置。通过其Builder 方法,返回一个CustomParameterInterceptor 对象。
⑤定义一个静态类,类中封装MD5 加密方法。
⑥通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar 放到Flume 的根目录下的lib 中运行即可。
2)修改Flume 的配置信息
①新增配置文件“spool-interceptor-hdfs.conf”,其内容为:
②服务器启动Flume 服务,其方法如下。
②服务器启动Flume 服务,其方法如下。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。