分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

flume自定义反序列化器deserializer

发布时间:2023-09-06 02:05责任编辑:沈小雨关键词:暂无标签

需求背景:

  在利用flume进行日志收集的时候,错误信息会将堆栈多行打印,需要将多行信息合并成一行,包装成一个event进行传输。

解决思路: 

  解决上述需求可以通过自定义拦截器和自定义反序列化器来实现。网上关于自定义拦截器的资料比较多,但考虑到拦截器的定位和使用场景,拦截器不应用于多个event拆分组合,并若flume有并发处理的话,不能保证读取event是顺序的。查阅资料发现,通过自定义flume的反序列化器更加合理和安全。

实现步骤:

  1:新建一个类,实现 EventDeserializer 接口

  2: 重写 readEvent()方法或readEvents方法

  3: 修改flume的配置文件,将sources.deserializer属性设置为自定义类

源码:

  1:自定义反序列化器 ---> MyLineDeserializer

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. ?See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. ?The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. ?You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.xxx.flume.serializer;import com.google.common.collect.Lists;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.annotations.InterfaceAudience;import org.apache.flume.annotations.InterfaceStability;import org.apache.flume.event.EventBuilder;import org.apache.flume.serialization.EventDeserializer;import org.apache.flume.serialization.ResettableInputStream;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.charset.Charset;import java.util.List;/** * A deserializer that parses text lines from a file. */@InterfaceAudience.Private@InterfaceStability.Evolvingpublic class MyLineDeserializer implements EventDeserializer { ???private static final Logger logger = LoggerFactory.getLogger ???????????(MyLineDeserializer.class); ???private final ResettableInputStream in; ???private final Charset outputCharset; ???private final int maxLineLength; ???private volatile boolean isOpen; ???public static final String OUT_CHARSET_KEY = "outputCharset"; ???public static final String CHARSET_DFLT = "UTF-8"; ???public static final String MAXLINE_KEY = "maxLineLength"; ???public static final int MAXLINE_DFLT = 2048; ???private StringBuffer eventStringBuffer = new StringBuffer(); ???MyLineDeserializer(Context context, ResettableInputStream in) { ???????this.in = in; ???????this.outputCharset = Charset.forName( ???????????????context.getString(OUT_CHARSET_KEY, CHARSET_DFLT)); ???????this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT); ???????this.isOpen = true; ???} ???/** ????* Reads a line from a file and returns an event ????* ????* @return Event containing parsed line ????* @throws IOException ????*/ ???@Override ???public Event readEvent() throws IOException { ???????ensureOpen(); ???????String line = readLine(); ???????Event event = null; ???????while (line != null) { ???????????// ?start with 20 is one timestamp , event end ???????????if (line.trim().startsWith("20")) { ???????????????event = EventBuilder.withBody(eventStringBuffer.toString(), outputCharset); ???????????????eventStringBuffer.delete(0, eventStringBuffer.length()); ???????????} ???????????// ?add current line push to buffer ???????????if (line.trim().length() > 0) { ???????????????if (eventStringBuffer.length() > 0) { ???????????????????eventStringBuffer.append(System.lineSeparator()).append(line); ???????????????} else { ???????????????????eventStringBuffer.append(line); ???????????????} ???????????} ???????????if (line.trim().startsWith("20")) { ???????????????break; ???????????} ???????????line = readLine(); ???????} ???????if (line == null && eventStringBuffer.toString().length() > 0 ){ ???????????event = ?EventBuilder.withBody(eventStringBuffer.toString(), outputCharset); ???????????eventStringBuffer.delete(0, eventStringBuffer.length()); ???????????return event; ???????} ???????return event; ???} ???/** ????* Batch line read ????* ????* @param numEvents Maximum number of events to return. ????* @return List of events containing read lines ????* @throws IOException ????*/ ???@Override ???public List<Event> readEvents(int numEvents) throws IOException { ???????ensureOpen(); ???????List<Event> events = Lists.newLinkedList(); ???????for (int i = 0; i < numEvents; i++) { ???????????Event event = readEvent(); ???????????if (event != null) { ???????????????events.add(event); ???????????} else { ???????????????break; ???????????} ???????} ???????return events; ???} ???@Override ???public void mark() throws IOException { ???????ensureOpen(); ???????in.mark(); ???} ???@Override ???public void reset() throws IOException { ???????ensureOpen(); ???????in.reset(); ???} ???@Override ???public void close() throws IOException { ???????if (isOpen) { ???????????reset(); ???????????in.close(); ???????????isOpen = false; ???????} ???} ???private void ensureOpen() { ???????if (!isOpen) { ???????????throw new IllegalStateException("Serializer has been closed"); ???????} ???} ???// TODO: consider not returning a final character that is a high surrogate ???// when truncating ???private String readLine() throws IOException { ???????StringBuilder sb = new StringBuilder(); ???????int c; ???????int readChars = 0; ???????while ((c = in.readChar()) != -1) { ???????????readChars++; ???????????// FIXME: support \r\n ???????????if (c == ‘\n‘) { ???????????????break; ???????????} ???????????sb.append((char) c); ???????????if (readChars >= maxLineLength) { ???????????????logger.warn("Line length exceeds max ({}), truncating line!", ???????????????????????maxLineLength); ???????????????break; ???????????} ???????} ???????if (readChars > 0) { ???????????return sb.toString(); ???????} else { ???????????return null; ???????} ???} ???public static class Builder implements EventDeserializer.Builder { ???????@Override ???????public MyLineDeserializer build(Context context, ResettableInputStream in) { ???????????return new MyLineDeserializer(context, in); ???????} ???}}

  2: flume 配置文件

a1.sources.r1.deserializer = ?com.xxx.flume.serializer.MyLineDeserializer$Builder

flume自定义反序列化器deserializer

原文地址:https://www.cnblogs.com/yuwenhui/p/9367625.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved