分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

Flume自定义Source

发布时间:2023-09-06 02:19责任编辑:胡小海关键词:暂无标签
模拟编写了一个Flume 1.7中TAILDIR的功能实现,通过手动控制文件的读取位置来达到对文件的读写,防止flume挂了之后重复消费的情况。
以下是代码实现,仅做参考,生产上直接用TAILDIR读取文件内容即可,若要读取一个目录下的子目录,可使用github上以实现的这个项目包:https://github.com/qwurey/flume-source-taildir-recursive

package com.fwmagic.flume.source;import org.apache.commons.io.FileUtils;import org.apache.commons.lang.StringUtils;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDrivenSource;import org.apache.flume.channel.ChannelProcessor;import org.apache.flume.conf.Configurable;import org.apache.flume.event.EventBuilder;import org.apache.flume.source.AbstractSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.File;import java.io.IOException;import java.io.RandomAccessFile;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * @Description:自定义Source 1、读取指定目录下的文件,如nginx的access.log * 2、读取文件前先判断offset文件是否存在,不存在则创建它 * 3、每次读取完都写一个offset文件记录读取到文件的什么位置,防止重启flume时发生重复消费的情况 * 4、如何自定义?参考ExecSource * <p> * (1):获取自定义配置文件属性 * (2):创建线程池,用channelProcessor发送数据给channel * (3):线程池提交(启动任务) * 任务内容: * (1):读取偏移量文件,没有则创建,有则获取偏移量,将读取的指针重置到指定偏移量 * (2):读取指定的日志文件,将读取的一行内容打包成Event,用Channel发送Event * (3):获取读取内容后的偏移量,重置偏移量 * (4):stop方法调用,关闭线程池,调用super.stop方法。 * @Date:Create in 2018/8/19 */public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable { ???/*监听的文件*/ ???private String filePath; ???/*记录读取偏移量的文件*/ ???private String posiFile; ???/*若读取文件暂无内容,则等待数秒*/ ???private Long interval; ???/*读写文件的字符集*/ ???private String charset; ???/*读取文件内容的线程*/ ???private FileRunner fileRunner; ???/*线程池*/ ???private ExecutorService executor; ???private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class); ???/** ????* 初始化配置文件内容 ????* ????* @param context ????*/ ???@Override ???public void configure(Context context) { ???????filePath = context.getString("filePath"); ???????posiFile = context.getString("posiFile"); ???????interval = context.getLong("interval", 2000L); ???????charset = context.getString("charset", "UTF-8"); ???} ???@Override ???public synchronized void start() { ???????//启动一个线程,用于监听对应的日志文件 ???????//创建一个线程池 ???????executor = Executors.newSingleThreadExecutor(); ???????//用channelProcessor发送数据给channel ???????ChannelProcessor channelProcessor = super.getChannelProcessor(); ???????fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor); ???????executor.submit(fileRunner); ???????super.start(); ???} ???@Override ???public synchronized void stop() { ???????fileRunner.setFlag(Boolean.FALSE); ???????while (!executor.isTerminated()) { ???????????logger.debug("waiting for exec executor service to stop"); ???????????try { ???????????????executor.awaitTermination(500, TimeUnit.MILLISECONDS); ???????????} catch (InterruptedException e) { ???????????????e.printStackTrace(); ???????????????logger.debug("Interrupted while waiting for executor service to stop,Just exiting."); ???????????????Thread.currentThread().interrupt(); ???????????} ???????} ???????super.stop(); ???} ???public static class FileRunner implements Runnable { ???????private Long interval; ???????private String charset; ???????private Long offset = 0L; ???????private File pFile; ???????private RandomAccessFile raf; ???????private ChannelProcessor channelProcessor; ???????private Boolean flag = Boolean.TRUE; ???????public void setFlag(Boolean flag) { ???????????this.flag = flag; ???????} ???????public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) { ???????????this.interval = interval; ???????????this.charset = charset; ???????????this.channelProcessor = channelProcessor; ???????????//1、判断是否有偏移量文件,有则读取偏移量,没有则创建 ???????????pFile = new File(posiFile); ???????????if (!pFile.exists()) { ???????????????try { ???????????????????pFile.createNewFile(); ???????????????} catch (IOException e) { ???????????????????e.printStackTrace(); ???????????????????logger.error("create position file error!", e); ???????????????} ???????????} ???????????//2、判断偏移量中的文件内容是否大于0 ???????????try { ???????????????String offsetStr = FileUtils.readFileToString(pFile, this.charset);// ?????????3、如果偏移量文件中有记录,则将内容转换为Long ???????????????if (StringUtils.isNotBlank(offsetStr)) { ???????????????????offset = Long.parseLong(offsetStr); ???????????????}// ??????????4、如果有偏移量,则直接跳到文件的偏移量位置 ???????????????raf = new RandomAccessFile(filePath, "r");// ?????????????跳到指定的位置 ???????????????raf.seek(offset); ???????????} catch (IOException e) { ???????????????e.printStackTrace(); ???????????????logger.error("read position file error!", e); ???????????} ???????} ???????@Override ???????public void run() { ???????????//监听文件 ???????????while (flag) {// ???????????读取文件中的内容 ???????????????String line = null; ???????????????try { ???????????????????line = raf.readLine(); ???????????????????if (StringUtils.isNotBlank(line)) {// ?????????????????????把数据打包成Event,发送到Channel ???????????????????????line = new String(line.getBytes("ISO-8859-1"), "UTF-8"); ???????????????????????Event event = EventBuilder.withBody(line.getBytes()); ???????????????????????channelProcessor.processEvent(event); ???????????????????????//更新偏移量文件,把偏移量写入文件 ???????????????????????offset = raf.getFilePointer(); ???????????????????????FileUtils.writeStringToFile(pFile, offset.toString()); ???????????????????} else { ???????????????????????try { ???????????????????????????Thread.sleep(interval); ???????????????????????} catch (InterruptedException e) { ???????????????????????????e.printStackTrace(); ???????????????????????????logger.error("thread sleep error", e); ???????????????????????} ???????????????????} ???????????????} catch (IOException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????} ???}}

Flume自定义Source

原文地址:http://blog.51cto.com/simplelife/2307943

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved