SinkProcessor:============================ ???FailOver: ???Load balancing : ???//负载均衡处理器 ???????????????//round_robin 轮询 1-2-3-1-2-3-... ???????????????//random ?????随机 1-3-2-3-1-... ???????1、round_robin 轮询 1-2-3-1-2-3-... ???????????????2、random 随机: ???????????????????自定义Sink && InterCeptor============================================= ???1、pom ???????<dependency> ???????????<groupId>org.apache.flume</groupId> ???????????<artifactId>flume-ng-core</artifactId> ???????????<version>1.8.0</version> ???????</dependency> ???2、编写sink: ???????public class MySink extends AbstractSink { ???????????public Status process() throws EventDeliveryException { ???????????//初始化status ???????????Status result = Status.READY; ???????????//得到channel对象 ???????????Channel channel = getChannel(); ???????????Transaction transaction = channel.getTransaction(); ???????????Event event = null; ???????????try { ???????????????//开启事务 ???????????????transaction.begin(); ???????????????//从channel中获取事件 ???????????????event = channel.take(); ???????????????if (event != null) { ???????????????//在事件中手动添加头部 ???????????????Map<String,String> map = new HashMap<String, String>(); ???????????????map.put("TimeStamp",System.currentTimeMillis() + ""); ???????????????event.setHeaders(map); ???????????????//获取body ???????????????byte[] body = event.getBody(); ???????????????//获取head值 ???????????????String headVal = event.getHeaders().get("TimeStamp"); ???????????????System.out.println("head: "+ headVal + "\tbody: " + new String(body)); ???????????????} else { ???????????????//没有事件,即为backoff ???????????????result = Status.BACKOFF; ???????????????} ???????????????//提交事务 ???????????????transaction.commit(); ???????????} catch (Exception ex) { ???????????????//回滚事务 ???????????????transaction.rollback(); ???????????????throw new EventDeliveryException("Failed to log event: " + event, ex); ???????????} finally { ???????????????//关闭事务 ???????????????transaction.close(); ???????????} ???????????return result; ???????????} ???????} ???????????3、打包并放在/soft/flume/lib下 ???????????????4、使用自定义sink ???????????????a1.sources = r1 ???????a1.sinks = k1 ???????a1.channels = c1 ???????# 配置source ???????a1.sources.r1.type = netcat ???????a1.sources.r1.bind = localhost ???????a1.sources.r1.port = 8888 ???????# 配置sink ???????a1.sinks.k1.type = com.oldboy.flume.MySink ???????# 配置channel ???????a1.channels.c1.type = memory ???????a1.channels.c1.capacity = 1000 ???????a1.channels.c1.transactionCapacity = 100 ???????# 绑定channel-source, channel-sink ???????a1.sources.r1.channels = c1 ???????a1.sinks.k1.channel = c1自定义拦截器:InterCeptor ???event大小拦截器================================ ???????自定义限速拦截器: ???????????1、设置参数 ???????????2、设置带参构造 ???????????3、创建Builder内部类,通过Builder构造对象 ???????????????1)通过configure得到参数的值或默认值 ???????????????2)通过build方法,构建InterCeptor对象 ???????????4、在Constants内部类设置常量值 ???????????public class SpeedInterceptor implements Interceptor { ???????????private int speed; ???????????public SpeedInterceptor(int speed) { ???????????this.speed = speed; ???????????} ???????????public void initialize() { ???????????} ???????????/** ????????????* 对event进行修改 ????????????* 限速拦截器,限速范围,单个事件 ????????????* ???????????时间范围需注意第一个时间 ????????????* ???????????speed = bodySize / time ????????????* ????????????* 对上一个事件进行速度计算,如果速度过快,sleep ????????????* lastTime ????????????* lastBodySize ????????????*/ ???????????private long lastTime = -1 ; ???????????private long lastBodySize = 0; ???????????public Event intercept(Event event) { ???????????Map<String, String> headers = event.getHeaders(); ???????????//获取body的长度 ???????????long bodySize = event.getBody().length; ???????????//获取当前时间 ???????????long current = System.currentTimeMillis(); ???????????//第一个事件 ???????????if(lastTime == -1){ ???????????????lastTime = current; ???????????????lastBodySize = bodySize; ???????????} ???????????//非第一个事件 ???????????else { ???????????????long duration = current - lastTime; ???????????????int currSpeed = (int) ((double)lastBodySize / duration * 1000); ???????????????//速度没超 ???????????????if( speed >= currSpeed){ ???????????????return event; ???????????????} ???????????????//速度超了 ???????????????else { ???????????????try { ???????????????????Thread.sleep(lastBodySize/speed * 1000 - duration); ???????????????} catch (Exception e) { ???????????????????e.printStackTrace(); ???????????????} ???????????????} ???????????????lastBodySize = bodySize; ???????????????lastTime = System.currentTimeMillis(); ???????????} ???????????return event; ???????????} ???????????public List<Event> intercept(List<Event> events) { ???????????for (Event event : events) { ???????????????intercept(event); ???????????} ???????????return events; ???????????} ???????????public void close() { ???????????} ???????????public static class Builder implements Interceptor.Builder { ???????????private int speed; ???????????public void configure(Context context) { ???????????????//相当于 context.getInteger("speed",1024); ???????????????speed = context.getInteger(Constants.SPEED, Constants.SPEED_DEFAULT); ???????????} ???????????public Interceptor build() { ???????????????return new SpeedInterceptor(this.speed); ???????????} ???????????} ???????????public static class Constants { ???????????public static final String SPEED = "speed"; ???????????public static final int SPEED_DEFAULT = 1024; ???????????} ???????} ???????????????????????自定义拦截器使用方法: ???============================== ???????1、编写代码,打包并放入/soft/flume/lib下 ???????2、编写配置文件i_speed.conf ???????????# 将agent组件起名 ???????????a1.sources = r1 ???????????a1.sinks = k1 ???????????a1.channels = c1 ???????????# 配置source ???????????a1.sources.r1.type = seq ???????????# 给拦截器起名 ???????????a1.sources.r1.interceptors = i1 ???????????# 指定拦截器类型 ???????????a1.sources.r1.interceptors.i1.type = com.oldboy.flume.SpeedInterceptor$Builder ???????????a1.sources.r1.interceptors.i1.speed = 1 ???????????a1.sources.r1.interceptors.i1.speed2 = 10 ???????????# 配置sink ???????????a1.sinks.k1.type = logger ???????????# 配置channel ???????????a1.channels.c1.type = memory ???????????a1.channels.c1.capacity = 1000 ???????????a1.channels.c1.transactionCapacity = 100 ???????????# 绑定channel-source, channel-sink ???????????a1.sources.r1.channels = c1 ???????????a1.sinks.k1.channel = c1 ???????3、flume-ng agent -n a1 -f i_speed.conf ???????注意:SinkProcessor和ChannelSelector ???ChannelSelector:挑选通道 ----> sink ?????????SinkProcessor: ??挑选sink ???在配置SinkProcessor的时候,注意ChannelSelector要设为默认(不配置) ???????使用ZK进行flume配置管理:============================================ ???1、在zk客户端创建节点(/flume/a1) ???//注意:节点a1是agent名称 ???????zkCli.sh -server s102:2181 ???2、在/flume/a1节点添加数据,使用zooInspector即可 ???//中文字符会出现乱码 ??????????????????可以使用idea插件(zookeeper) ???????a1.sources = r1 ???????a1.sinks = k1 ???????a1.channels = c1 ???????a1.sources.r1.type = netcat ???????a1.sources.r1.bind = localhost ???????a1.sources.r1.port = 8888 ???????a1.sinks.k1.type = logger ???????a1.channels.c1.type = memory ???????a1.channels.c1.capacity = 1000 ???????a1.channels.c1.transactionCapacity = 100 ???????a1.sources.r1.channels = c1 ???????a1.sinks.k1.channel = c1 ???????3、尝试启动flume ???????flume-ng agent -n a1 -z s102:2181 -p /flume
flume中自定义sink InterCeptor
原文地址:https://www.cnblogs.com/zyde/p/8946626.html