tail-to-avro
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure spooldir source1 #agent1.sources.source1.type = spooldir #agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1 #agent1.sources.source1.fileHeader = true # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -n +0 -F /tmp/log.log agent1.sources.source1.channels = channel1 # Describe/configure nc source1 #agent1.sources.source1.type = netcat #agent1.sources.source1.bind = localhost #agent1.sources.source1.port = 44444 #configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # Describe logger sink1 #agent1.sinks.sink1.type = logger # Describe avro sink1 agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = 172.16.10.175 agent1.sinks.sink1.port = 4545 # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
avro-to-rollfile
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure avro source agent1.sources.source1.type = avro agent1.sources.source1.bind =172.16.10.175 agent1.sources.source1.port = 4545 # Describe logger sink1 #agent1.sinks.sink1.type = logger # Describe file sink1 agent1.sinks.sink1.type = file_roll agent1.sinks.sink1.sink.directory = /var/log/flume # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
启动:
./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.0-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.0-bin/conf/flume-single.properties -n agent1 -Dflume.root.logger=INFO,console
亲自操作如下:
source配置(接收):
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'a #agent section producer.sources = s producer.channels = c producer.sinks = r #producer.sources.s.type = seq producer.sources.s.channels = c #producer.sources.s.type = exec #producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log producer.sources.s.deletePolicy=never producer.sources.s.type = avro producer.sources.s.bind = localhost producer.sources.s.port = 4545 # Each sink's type must be defined(给谁了) #producer.sinks.r.type = avro #producer.sinks.r.hostname = 10.1.1.100 #producer.sinks.r.port = 20000 producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink producer.sinks.r.zk.connect = 127.0.0.1:2181 producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000000 producer.channels.c.transactionCapacity = 1000000 #producer.channels.c.type=file #producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent #producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen
sink配置(发送):
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'a #agent section producer.sources = s producer.channels = c producer.sinks = r #producer.sources.s.type = seq producer.sources.s.channels = c producer.sources.s.type = exec producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log producer.sources.s.deletePolicy=never #producer.sources.s.type = avro #producer.sources.s.bind = localhost #producer.sources.s.port = 10000 # Each sink's type must be defined(给谁了) producer.sinks.r.type = avro producer.sinks.r.hostname = localhost producer.sinks.r.port = 4545 #producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink #producer.sinks.r.zk.connect = 127.0.0.1:2181 #producer.sinks.r.metadata.broker.list=127.0.0.1:9092 #producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner #producer.sinks.r.serializer.class=kafka.serializer.StringEncoder #producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000000 producer.channels.c.transactionCapacity = 1000000 #producer.channels.c.type=file #producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent #producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen
启动顺序,先启动source接收,再启动sink
#先启动接收source,准备好接收 #./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosource.properties -n producer -Dflume.root.logger=INFO,console #再启动发送sink,发送 #./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosink.properties -n producer -Dflume.root.logger=INFO,console
相关推荐
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng安装
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume-ng-1.6.0-cdh5.5.0.tar.gz
flume-ng-sql-source-1.5.2源码
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-ng-1.6.0 cdh5.7.0安装包,稳定版本。大家可以自由下载
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume-ng-sql-source-1.4.3.jar
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
flume-ng-sql-source实现oracle增量数据读取 flume连接oracle增量数据读取
CDH版本的flume Flume是Cloudera提供的一个高可用的,高可靠...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
flume-ng-sdk-1.6.0.ja,如果想要通过log4j将日志直接导入到flume需要导入这个jar包