分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

发布时间:2023-09-06 02:28责任编辑:林大明关键词:暂无标签

  一 kafka介绍                               

           kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了。硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快。kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性。最熟悉的消息队列框架有ActiveMQ 和 RabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列的一些瓶颈,比如并发,rabbitMQ在多个处理程序下,并不能保证执行顺序,还是必须自己去处理独占,而kafka使用consumer group的方式,实现了可以多个处理程序处理一个topic下的记录。如图:

每个分区的记录保证能被每个组接受,这样可以并发去处理一个topic的记录,而且扩展组,则可以随意根据应用需求去扩展你的应用程序,但是每个组的消费者不能超过分区的数量。

kafka Distribution 提供了容错的功能,每一个partition都有一个服务器叫leader,还有零个或者一个以上的服务器叫follower,当这些follower都在同步数据的时候,leader扛起所有的写和读,当leader挂掉,follower会随机选取一个服务器当leader,当然必须有几个follower同步时 in-sync的。还有kafka虽然的那个记录具有原子性,但是并不支持事务。

因为这一篇并不是专门讲解kafka,所以点到为止。

  二     扩展服务 开发                          

     以前讲过,netcore的一个很重要的特性就是支持依赖注入,在这里一切皆服务。那么如果需要kafka作为日志服务的终端,就首先需要kafka服务,下面咱们就开发一个kafka服务。

首先,服务就是需要构建,这是netcore开发服务的第一步,我们首先建立一个IKafkaBuilder.cs接口类,如下:

homusing Microsoft.Extensions.DependencyInjection;namespace Walt.Freamwork.Service{ ???public interface IKafkaBuilder ???{ ????????/// <summary> ???????/// Gets the <see cref="IServiceCollection"/> where Logging services are configured. ???????/// </summary> ???????IServiceCollection Services { get; } ???}}

再实现它,KafkaBuilder.cs

using Microsoft.Extensions.DependencyInjection;namespace Walt.Freamwork.Service{ ???public class KafkaBuilder : IKafkaBuilder ???{ ???????public IServiceCollection Services {get;} ???????public KafkaBuilder(IServiceCollection services) ???????{ ???????????Services=services; ???????} ???}}

再利用扩展方法为serviceCollection类加上扩展方法:

 using System;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.DependencyInjection.Extensions;using Walt.Framework.Service.Kafka;namespace Walt.Framework.Service{ ???????public static class ServiceCollectionExtensions ???{ ???????/// <summary> ???????/// Adds logging services to the specified <see cref="IServiceCollection" />. ???????/// </summary> ???????/// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param> ???????/// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns> ???????public static IServiceCollection AddKafka(this IServiceCollection services) ???????{ ???????????return AddKafka(services, builder => { }); ???????} ????????public static IServiceCollection AddKafka(this IServiceCollection services ???????, Action<IKafkaBuilder> configure) ???????{ ???????????if (services == null) ???????????{ ???????????????throw new ArgumentNullException(nameof(services)); ???????????} ???????????services.AddOptions(); ????????????configure(new KafkaBuilder(services)); ???????????services.TryAddSingleton<IKafkaService,KafkaService>(); ?//kafka的服务类 ???????????return services; ???????} ???}}
KafkaService的实现:
using System;using System.Collections.Generic;using System.Threading.Tasks;using Confluent.Kafka;using Microsoft.Extensions.Options;namespace ?Walt.Framework.Service.Kafka{ ???public class KafkaService : IKafkaService ???{ ???????private KafkaOptions _kafkaOptions; ???????private Producer _producer; ???????public KafkaService(IOptionsMonitor<KafkaOptions> ?kafkaOptions) ???????{ ???????????_kafkaOptions=kafkaOptions.CurrentValue; ????????????kafkaOptions.OnChange((kafkaOpt,s)=>{ ???????????????_kafkaOptions=kafkaOpt; ????????????????????System.Diagnostics.Debug ???????????????????.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s); ???????????????????????????????}); ????????????_producer=new Producer(_kafkaOptions.Properties); ???????} ???????private byte[] ConvertToByte(string str) ???????{ ???????????return System.Text.Encoding.Default.GetBytes(str); ???????} ????????public ?async Task<Message> Producer(string topic,string key,string value) ???????{ ?????????????if(string.IsNullOrEmpty(topic) ???????????||string.IsNullOrEmpty(value)) ???????????{ ???????????????throw new ArgumentNullException("topic或者value不能为null."); ???????????} ????????????????var task= ?await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); ???????????return task; ???????} ????}}

那么咱们是不是忘记什么了,看上面的代码,是不是那个配置类KafkaOptions 还没有说明?

再在这个位置添加kafka的配置类KafkaConfigurationOptions:

using Microsoft.Extensions.Configuration;using Microsoft.Extensions.Options;using Walt.Freamwork.Service;namespace Walt.Freamwork.Configuration{ ???public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions> ???{ ???????private readonly IConfiguration _configuration; ???????public KafkaConfigurationOptions(IConfiguration configuration) ???????{ ??????????_configuration=configuration; ???????} ???????public void Configure(KafkaOptions options) ???????{ ???????????????//这里仅仅自定义一些你自己的代码,使用上面configuration配置中的配置节,处理程序没法自动绑定的
 ?????????????????一些事情。 ???????} ???}}

然后,将配置类添加进服务:

using Microsoft.Extensions.Configuration;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.DependencyInjection.Extensions;using Microsoft.Extensions.Options;using Walt.Framework.Service;namespace Walt.Framework.Configuration{ ???public static class KafkaConfigurationExtensioncs ???{ ?????????public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder ?????????,IConfiguration configuration) ?????????{ ??????????????????????????????InitService( builder,configuration); ????????????????return builder; ?????????} ?????????public static void InitService(IKafkaBuilder builder,IConfiguration configuration) ?????????{ ???????????builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>( ?????????????????new KafkaConfigurationOptions(configuration)); ?//配置类和配置内容 ???????????builder.Services.TryAddSingleton ???????????(ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>( ?????????????????new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//这个是观察类,如果更改,会激发onchange方法 ???????????builder.Services ???????????.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>> ???????????(new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //这个是option类,没这个,配置无法将类绑定 ????????????????????????builder.Services.AddSingleton(new KafkaConfiguration(configuration)); ?????????} ???}} 

ok,推送nuget,业务部分调用。

  三     kafka服务调用                          

在project中引用然后restore:

引入命名空间:

调用:

using System;using System.Collections.Generic;using System.IO;using System.Linq;using System.Threading.Tasks;using Microsoft.AspNetCore;using Microsoft.AspNetCore.Hosting;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Logging; using Newtonsoft.Json;using Walt.Framework.Log;using Walt.Framework.Configuration;using Walt.Framework.Service;namespace Walt.TestMcroServoces.Webapi{ ???public class Program ???{ ????????public static void Main(string[] args) ???????{ ?????????????????????????var host = new WebHostBuilder() ???????????.ConfigureAppConfiguration((hostingContext, configContext) =>{ ????????????????var en=hostingContext.HostingEnvironment; ????????????????if(en.IsDevelopment()) ????????????????{ ????????????????????configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json"); ????????????????} ????????????????else ????????????????{ ????????????????????configContext.AddJsonFile("appsettings.json"); ????????????????} ??????????????????configContext.AddCommandLine(args) ????????????.AddEnvironmentVariables() ????????????.SetBasePath(Directory.GetCurrentDirectory()).Build(); ??????????????????????????}).ConfigureServices((context,configureServices)=>{ ??????????????????configureServices.AddKafka(KafkaBuilder=>{ ???????????????????KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService")); ??????????????????}); ???????????}) ??//kafka的调用。 ???????????.ConfigureLogging((hostingContext, logging) => { ????????????????logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging")) ???????????????.AddCustomizationLogger(); ???????????}).UseKestrel(KestrelServerOption=>{ ???????????????KestrelServerOption.ListenAnyIP(801); ???????????}) ???????????.UseStartup<Startup>().Build(); ????????????host.Run(); ????????????Console.ReadKey(); ???????} ???}}

然后提交git,让jenkins构建docker发布运行:

jenkin是是非常牛的一款构建工具,不仅仅根据插件可以扩展不同环境,还支持分布式构建.

这是我们用jenikins构建的的:

让它跑起来:

调用看看:

这个方法是输出Properties数组的:

  四 集成kafka                         

kafka的接口不多,看看都有那些:

https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html

Consumer和Producer是咱们发布消息和调用消息的两个主类,代码在上文已经实现的service。

客户端代码:

使用my-replicated-topic-morepart这儿topic,还是希望多分区,因为后面consumer使用分布式计算读取。

consumer先在客户端监听:

product端的调用代码:

执行这个接口后,再看consumer接收到的消息:

最后一步,将咱们kafka日志部分替换为真实的kafka环境,看结果:


分布式日志到这里结束,可能大家觉得后面还有日志索引和日志展现,因为这个读kafka需要分布式去处理,

我下面刚好要写分布式计算的文章,所以到时可以拿这个当例子,承前继后。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace ?Walt.Framework.Service.Kafka
{
???public class KafkaService : IKafkaService
???{

???????private KafkaOptions _kafkaOptions;
???????private Producer _producer;
???????public KafkaService(IOptionsMonitor<KafkaOptions> ?kafkaOptions)
???????{
???????????_kafkaOptions=kafkaOptions.CurrentValue;
???????????kafkaOptions.OnChange((kafkaOpt,s)=>{
???????????????_kafkaOptions=kafkaOpt;
???????????????????System.Diagnostics.Debug
???????????????????.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
 
???????????});
????????????_producer=new Producer(_kafkaOptions.Properties);
???????}

???????private byte[] ConvertToByte(string str)
???????{
???????????return System.Text.Encoding.Default.GetBytes(str);
???????}
 
???????public ?async Task<Message> Producer(string topic,string key,string value)
???????{ ?
???????????if(string.IsNullOrEmpty(topic)
???????????||string.IsNullOrEmpty(value))
???????????{
???????????????throw new ArgumentNullException("topic或者value不能为null.");
???????????}
 
??????????var task= ?await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value));
??????????return task;
???????}
 
???}
}

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

原文地址:https://www.cnblogs.com/ck0074451665/p/10211725.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved