认识Flume
1.Flume简介:Flume是由Cloudera提供的一个分布式,高可靠,高可用的服务,用于分布式的海量日志的高效收集,聚合,移动系统。
官网概述:Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
2.Flume适用的业务场景
3.Flume核心组件为三部分
1.Source 收集器
2.Channel 聚集通道
3.Sink 输出工具
如图:
组合方式:
多个代理节点之间数据流动,前节点的Sink和后节点的Source 必须为avro类型,并且指定主机名和端口号(具体配置参考下文)。
当有大量日志和多台生产服务器的时候需要配置多个代理去收集各个服务器上的日志数据,将这些数据发送到统一的收集器,由统一的收集器写到HDFS或者其他文件系统。
Flume支持将事件流多路复用到一个或多个系统中,通过多路复用可以让Source把收集的数据写入到多个Channel,然后用多个Sink输出到不同的系统中。
下面以一个简单的列子来搭建环境并演示。
众所周知,Nginx会把每次的访问记录记录到access.log文件中,我们两个Flume实例,一台负责收集Nginx access.log的增量,然后发送到另一台Flume服务器,第二台Flume服务器把收到的日志打印到控制台。
1.查看Nginx access.log
2.在当前Nginx所在服务器上部署Flume
我这里为了方便之后集成kafka spark方便 使用 Flume 的cdh版本 cdh 5.7.0 , 官网下载同理
cdh地址:http://archive.cloudera.com/cdh5/cdh/5/
官网地址:http://flume.apache.org/download.html
#回到软件目录下载 cdh版的Flumewget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gz#解压tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz
编辑配置文件设置JDK路径。进入Flume目录 conf 编辑复制一份配置文件为flume-env.sh
cd flume-ng-1.6.0/confcp flume-env.sh.template flume-env.shvim flume-env.sh
修改此处为实际的jdk路径并保存
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced# during Flume startup.# Enviroment variables can be set here. export JAVA_HOME=/usr/java/jdk1.8.0_141# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
进入Flume下bin目录,执行如下命令查看版本验证Flume安装是否成功
cd ../bin./flume-ng version
打印如下信息: Flume版本 源码地址 编译工具等信息
Flume 1.6.0-cdh5.7.0Source code repository: https://git-wip-us.apache.org/repos/asf/flume.gitRevision: 8f5f5143ae30802fe79f9ab96f893e6c54a105d1Compiled by jenkins on Wed Mar 23 11:38:48 PDT 2016From source with checksum 50b533f0ffc32db9246405ac4431872e
至此Flume环境安装成功,因为这类的日志收集工具是严格按照开发者配置的信息来执行任务的,所以接下来分析配置文件。
这是一份官方文档给出的配置文件,我们来分析下:
# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
a1代表以当前配置文件运行的Flume实例的名称,r1表示source,c1表示channel,k1表示sinks
a1.sources = r1a1.sinks = k1a1.channels = c1
这三行表示a1实例的source为r1,sinks为k1,channel为c1
a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444
这三行指明,a1实例的r1 source类型为从netcat(网络读取数据),监听的地址和端口分别为localhost和44444
a1.sinks.k1.type = logger
这一行说明,a1实例的k1 sink类型为logger(以日志方式打印到控制台)
a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100
这三行指明,a1实例的c1 channel类型为memory(内存型通道),缓存大小。
a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
这两行最为关键,将Flume实例的三个组件连接起来。
a1实例的r1 source组件把收集的数据写入通道c1
a1实例的k1 sink组件从通道c1中读取数据
我们可以第一步先实现一个本地日志读取并打印的控制台的demo。复制上面的配置文件创建并编辑,修改source的类型exec,因为我们要读Nginx access.log的增量。最终的配置文件为:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -f /usr/local/nginx/logs/access.loga1.sources.r1.shell = /bin/sh -c# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memory# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动命令格式:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
我们运行:
./bin/flume-ng agent --conf /usr/server/flume-1.6.0/conf --conf-file /usr/server/flume-1.6.0/conf/example.conf --name a1 -Dflume.root.logger=INFO,console
启动成功打印:
nitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.2018-08-13 17:48:33,391 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
我们访问Nginx测试下,控制台打印如下:
2018-08-13 17:57:44,909 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 2E 31 31 39 2E 31 33 32 2E 31 36 37 20 2D 20 1.119.132.167 - }2018-08-13 17:57:49,214 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 2E 31 31 39 2E 31 33 32 2E 31 36 37 20 2D 20 1.119.132.167 - }
接下来配置第二台Flume,基本配置一样
只需了解两台串联的组件类型就行
exec source + memory channel + avro sink
avro source + memory channel + logger sink
两份配置文件如下(位于两台独立服务器 所以不用考虑实例名称a1重复的问题,如果读者一台机器启动两个实例的话记得修改实例名称a1):
exec source + memory channel + avro sink :exec-memory-avro.conf
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -f /usr/local/nginx/logs/access.loga1.sources.r1.shell = /bin/sh -C# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.host = 39.106.193.183a1.sinks.k1.port = 6666# Use a channel which buffers events in memorya1.channels.c1.type = memory# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
特别说明:a1.sinks.k1.host为第二台机器的IP 不是本机IP 也就是后面先启动第二台服务器上实例的原因
注意:防火墙 安全组 放行相应的端口
avro source + menory channel + logger sink : avro-memory-logger
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 6666# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memory# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动顺序:先启动avro-memory-logger 然后启动exec-memory-avro.conf
./bin/flume-ng agent --conf /usr/server/flume-1.6.0/conf --conf-file /usr/server/flume-1.6.0/conf/avro-memory-logger.conf --name a1 -Dflume.root.logger=INFO,console./bin/flume-ng agent --conf /usr/server/flume-1.6.0/conf --conf-file /usr/server/flume-1.6.0/conf/exec-memory-avro.conf --name a1 -Dflume.root.logger=INFO,console
启动成功后HTTP访问Nginx服务器,可以看到第二台服务器(avro-memory-logger)打印出日志信息。
至此 本文完。