分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

kafka源码分析(二)Metadata的数据结构与读取、更新策略

发布时间:2023-09-06 01:55责任编辑:熊小新关键词:暂无标签

一、基本思路

 异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。

要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。

二、2个数据流

所以在上图中,有2个数据流: 

Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata; KafkaProducer先读取Metadata,然后把消息放入队列。

消息流(B1, B2, B3)

从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的

三、Metadata的线程安全性

从下面代码也可以看出,它的所有public方法都是synchronized:

 1 public final class Metadata { 2 ??。。。 3 ????public synchronized Cluster fetch() { 4 ????????return this.cluster; 5 ????} 6 ????public synchronized long timeToNextUpdate(long nowMs) { 7 ???????。。。 8 ????} 9 ????public synchronized int requestUpdate() {10 ??????。。。11 ????}12 ????。。。 ???13 }

四、Metadata的数据结构

 1 public final class Metadata { 2 ... 3 ????private final long refreshBackoffMs; ?//更新失败的情况下,下1次更新的补偿时间(这个变量在代码中意义不是太大) 4 ????private final long metadataExpireMs; //关键值:每隔多久,更新一次。缺省是600*1000,也就是10分种 5 ????private int version; ????????//每更新成功1次,version递增1。这个变量主要用于在while循环,wait的时候,作为循环判断条件 6 ????private long lastRefreshMs; ?//上一次更新时间(也包含更新失败的情况) 7 ????private long lastSuccessfulRefreshMs; //上一次成功更新的时间(如果每次都成功的话,则2者相等。否则,lastSuccessulRefreshMs < lastRefreshMs) 8 ????private Cluster cluster; ??//集群配置信息 9 ????private boolean needUpdate; ?//是否强制刷新10 、11 ??...12 }

kafka源码分析(二)Metadata的数据结构与读取、更新策略

原文地址:https://www.cnblogs.com/zcjcsl/p/8746561.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved