分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

kafkaStream解析json出错导致程序中断的解决方法

发布时间:2023-09-06 01:50责任编辑:胡小海关键词:jsjson

出错在 KStreamFlatMapValues 方法执行时,由于json异常数据无法解析,结果生成的值为null,报错信息如下:

2018-04-18 19:21:04,776 ERROR [app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1] com.gw.stream.KStream103.lambda$main$1(100) | 捕获到异常:hello world hello world kingException in thread "app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1" java.lang.NullPointerException ???????at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41) ???????at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) ???????at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) ???????at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) ???????at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) ???????at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) ???????at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224) ???????at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) ???????at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) ???????at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) ???????at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) ???????at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) ???????at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

问题解决方案:

  1. 对json解析的bean添加未知字段忽略
import java.util.List;import com.fasterxml.jackson.annotation.JsonIgnoreProperties;@JsonIgnoreProperties(ignoreUnknown = true)public class Bean103 { ???????private List<String> key1; ???private List<List<String>> key2; ???????????public void setKey1(List<String> key1) { ????????this.key1 = key1; ????} ????public List<String> getKey1() { ????????return key1; ????} ???public void setKey2(List<List<String>> key2) { ????????this.key2 = key2; ????} ????public List<List<String>> getKey2() { ????????return key2; ????}}
  1. 由于报空指针错误,所以解决空指针问题,即判断为null时创建一个空对象.
return list == null ? new ArrayList<String>():list;

完整的示例代码如下:

package com.gw.stream;import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.stream.Collectors;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Produced;import org.apache.log4j.Logger;import com.alibaba.fastjson.JSONObject;public class KStream103 { ???private static Logger log = Logger.getLogger(KStream103.class); ???public static void main(String[] args) { ???????????????if(args.length < 6){ ???????????log.error("错误:参数个数不正确[application_id bootstarp_server groupid source_topic target_topic auto_offset_reset]"); ???????????return ; ???????} ???????String application_id=args[0]; ???????String bootstarp_server = args[1]; ???????String groupid = args[2]; ???????String source_topic = args[3]; ???????String target_topic = args[4]; ???????String auto_offset_reset = args[5]; ???????????????????Properties props = new Properties(); ???????// consumer group ???????// 指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件 ???????props.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id); ???????props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/"); ???????props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarp_server); ???????// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760); ???????props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); ???????props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ???????props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ???????props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset); ???????props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); ?//自动提交 ???????props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid); ???????//针对时间异常解决方法 ???????props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); ???????????????final String splitChar = "\001"; ???????StreamsBuilder builder = new StreamsBuilder(); ???????KStream<String, String> textLines = builder.stream(source_topic); // 接收第一个topic ???????textLines.flatMapValues(value -> { ???????????????????????Bean103 bean103 = null; ???????????List<String> list = null; ???????????try { ???????????????????????????????//这里是value的业务处理逻辑...最终返回的是一个list ???????????????????????????????????????????} catch (Exception e) { ???????????????log.error("捕获到异常:" + value); ???????????????log.error("error message:" + e.getMessage()); ???????????????????????????} ???????????return list == null ? new ArrayList<String>():list; ???????}).filter((k,v)-> v !=null).map((k, v) -> new KeyValue<>(k, v)) ???????.to(target_topic, Produced.with(Serdes.String(), Serdes.String())); ???????????????KafkaStreams streams = new KafkaStreams(builder.build(), props); ???????streams.start(); ???}}

kafkaStream解析json出错导致程序中断的解决方法

原文地址:https://www.cnblogs.com/30go/p/8877204.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved