flume sink核心类结构
1 核心接口Sink
org.apache.flume.Sink
?/** ??* <p>Requests the sink to attempt to consume data from attached channel</p> ??* <p><strong>Note</strong>: This method should be consuming from the channel ??* within the bounds of a Transaction. On successful delivery, the transaction ??* should be committed, and on failure it should be rolled back. ??* @return READY if 1 or more Events were successfully delivered, BACKOFF if ??* no data could be retrieved from the channel feeding this sink ??* @throws EventDeliveryException In case of any kind of failure to ??* deliver data to the next hop destination. ??*/ ?public Status process() throws EventDeliveryException; ?public static enum Status { ???READY, BACKOFF ?}
process为核心接口,返回值为状态,只有两个:ready和backoff,调用方会根据返回值做相应处理,后边会看到;
这个接口也是扩展flume sink需要实现的接口,比如KuduSink;
2 Sink封装
org.apache.flume.SinkProcessor
/** * <p> * Interface for a device that allows abstraction of the behavior of multiple * sinks, always assigned to a SinkRunner * </p> * <p> * A sink processors {@link SinkProcessor#process()} method will only be * accessed by a single runner thread. However configuration methods * such as {@link Configurable#configure} may be concurrently accessed. * * @see org.apache.flume.Sink * @see org.apache.flume.SinkRunner * @see org.apache.flume.sink.SinkGroup */public interface SinkProcessor extends LifecycleAware, Configurable { ?/** ??* <p>Handle a request to poll the owned sinks.</p> ??* ??* <p>The processor is expected to call {@linkplain Sink#process()} on ??* ?whatever sink(s) appropriate, handling failures as appropriate and ??* ?throwing {@link EventDeliveryException} when there is a failure to ??* ?deliver any events according to the delivery policy defined by the ??* ?sink processor implementation. See specific implementations of this ??* ?interface for delivery behavior and policies.</p> ??* ??* @return Returns {@code READY} if events were successfully consumed, ??* or {@code BACKOFF} if no events were available in the channel to consume. ??* @throws EventDeliveryException if the behavior guaranteed by the processor ??* couldn‘t be carried out. ??*/ ?Status process() throws EventDeliveryException;
这个类负责封装单个sink或者sink group的处理,常用的子类有:
1)单个sink
org.apache.flume.sink.DefaultSinkProcessor
?@Override ?public Status process() throws EventDeliveryException { ???return sink.process(); ?}
DefaultSinkProcessor的process会直接调用内部sink的process;
2)sink group
org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.sink.FailoverSinkProcessor.FailedSink
3 sink的调用方为SinkRunner
org.apache.flume.SinkRunner
/** * <p> * A driver for {@linkplain Sink sinks} that polls them, attempting to * {@linkplain Sink#process() process} events if any are available in the * {@link Channel}. * </p> * * <p> * Note that, unlike {@linkplain Source sources}, all sinks are polled. * </p> * * @see org.apache.flume.Sink * @see org.apache.flume.SourceRunner */public class SinkRunner implements LifecycleAware {... ?private static final long backoffSleepIncrement = 1000; ?private static final long maxBackoffSleep = 5000;org.apache.flume.SinkRunner.PollingRunner ?public static class PollingRunner implements Runnable { ???private SinkProcessor policy; ???private AtomicBoolean shouldStop; ???private CounterGroup counterGroup; ???@Override ???public void run() { ?????logger.debug("Polling sink runner starting"); ?????while (!shouldStop.get()) { ???????try { ?????????if (policy.process().equals(Sink.Status.BACKOFF)) { ???????????counterGroup.incrementAndGet("runner.backoffs"); ???????????Thread.sleep(Math.min( ???????????????counterGroup.incrementAndGet("runner.backoffs.consecutive") ???????????????* backoffSleepIncrement, maxBackoffSleep)); ?????????} else { ???????????counterGroup.set("runner.backoffs.consecutive", 0L); ?????????} ???????} catch (InterruptedException e) { ?????????logger.debug("Interrupted while processing an event. Exiting."); ?????????counterGroup.incrementAndGet("runner.interruptions"); ???????} catch (Exception e) { ?????????logger.error("Unable to deliver event. Exception follows.", e); ?????????if (e instanceof EventDeliveryException) { ???????????counterGroup.incrementAndGet("runner.deliveryErrors"); ?????????} else { ???????????counterGroup.incrementAndGet("runner.errors"); ?????????} ?????????try { ???????????Thread.sleep(maxBackoffSleep); ?????????} catch (InterruptedException ex) { ???????????Thread.currentThread().interrupt(); ?????????} ???????} ?????} ?????logger.debug("Polling runner exiting. Metrics:{}", counterGroup); ???} ?}
无论process返回backoff或者抛exception,都会sleep一段时间,所以flume的sink一旦遇到大量异常数据或者自定义sink返回backoff,都会非常慢;
【原创】大数据基础之Flume(2)Sink代码解析
原文地址:https://www.cnblogs.com/barneywill/p/10570545.html