分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder

发布时间:2023-09-06 02:12责任编辑:胡小海关键词:暂无标签

一般情况下我们在使用Dataset<Row>进行groupByKey时,你会发现这个方法最后一个参数需要一个encoder,那么这些encoder如何定义呢?

一般数据类型

static Encoder<byte[]> ???BINARY() ??????????????????????????An encoder for arrays of bytes.static Encoder<Boolean> ???BOOLEAN() ????????????????????????An encoder for nullable boolean type.static Encoder<Byte> ???BYTE() ??????????????????????????????An encoder for nullable byte type.static Encoder<java.sql.Date> ???DATE() ?????????????????????An encoder for nullable date type.static Encoder<java.math.BigDecimal> ???DECIMAL() ???????????An encoder for nullable decimal type.static Encoder<Double> ???DOUBLE() ??????????????????????????An encoder for nullable double type.static Encoder<Float> ???FLOAT() ????????????????????????????An encoder for nullable float type.static Encoder<Integer> ???INT() ????????????????????????????An encoder for nullable int type.static Encoder<Long> ???LONG() ??????????????????????????????An encoder for nullable long type.static Encoder<Short> ???SHORT() ????????????????????????????An encoder for nullable short type.static Encoder<String> ???STRING() ??????????????????????????An encoder for nullable string type.static Encoder<java.sql.Timestamp> ???TIMESTAMP() ???????????An encoder for nullable timestamp type.

示例:

== Scala == Encoders are generally created automatically through implicits from a SparkSession, or can be explicitly created by calling static methods on Encoders. ??import spark.implicits._ ??val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder) == Java == Encoders are specified by calling static methods on Encoders. ??List<String> data = Arrays.asList("abc", "abc", "xyz"); ??Dataset<String> ds = context.createDataset(data, Encoders.STRING()); 

Class类型:

Or constructed from Java Beans: ??Encoders.bean(MyClass.class); 

Tuple类型:

一般类型的Tuple

 ??Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING()); ??List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a"); ??Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);

Tuple包含类的:

Encoder<Tuple2<String, MyClass>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.bean(MyClass.class));

关于Encoder请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoder.html》

关于Encoders请参考《http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Encoders.html》

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder

原文地址:https://www.cnblogs.com/yy3b2007com/p/9551644.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved