广告位联系
返回顶部
分享到

Flink支持的几种数据类型介绍

java 来源:转载 作者:秩名 发布时间:2021-06-09 18:46:12 人浏览
摘要

一、支持的数据类型 Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制。这样做的原因是系统会分析类型以确定有效的执行策略。 1.Java Tuple 和 Scala Case类; 2.Java POJO; 3.基本类型; 4.通用类; 5.值; 6.Hadoop Writables; 7.特殊类

一、支持的数据类型

Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制。这样做的原因是系统会分析类型以确定有效的执行策略。

1.Java Tuple 和 Scala Case类;

2.Java POJO;

3.基本类型;

4.通用类;

5.值;

6.Hadoop Writables;

7.特殊类型

二、Flink之Tuple类型

Tuple类型  Tuple 是flink 一个很特殊的类型 (元组类型),是一个抽象类,共26个Tuple子类继承Tuple 他们是 Tuple0一直到Tuple25

package org.apache.flink.api.java.tuple;
​
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
​
@Public
public abstract class Tuple implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final int MAX_ARITY = 25;
    private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};
​
    public Tuple() {
    }
​
    public abstract <T> T getField(int var1);
​
    public <T> T getFieldNotNull(int pos) {
        T field = this.getField(pos);
        if (field != null) {
            return field;
        } else {
            throw new NullFieldException(pos);
        }
    }
​
    public abstract <T> void setField(T var1, int var2);
​
    public abstract int getArity();
​
    public abstract <T extends Tuple> T copy();
​
    public static Class<? extends Tuple> getTupleClass(int arity) {
        if (arity >= 0 && arity <= 25) {
            return CLASSES[arity];
        } else {
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
​
    public static Tuple newInstance(int arity) {
        switch(arity) {
        case 0:
            return Tuple0.INSTANCE;
        case 1:
            return new Tuple1();
        case 2:
            return new Tuple2();
        case 3:
            return new Tuple3();
        case 4:
            return new Tuple4();
        case 5:
            return new Tuple5();
        case 6:
            return new Tuple6();
        case 7:
            return new Tuple7();
        case 8:
            return new Tuple8();
        case 9:
            return new Tuple9();
        case 10:
            return new Tuple10();
        case 11:
            return new Tuple11();
        case 12:
            return new Tuple12();
        case 13:
            return new Tuple13();
        case 14:
            return new Tuple14();
        case 15:
            return new Tuple15();
        case 16:
            return new Tuple16();
        case 17:
            return new Tuple17();
        case 18:
            return new Tuple18();
        case 19:
            return new Tuple19();
        case 20:
            return new Tuple20();
        case 21:
            return new Tuple21();
        case 22:
            return new Tuple22();
        case 23:
            return new Tuple23();
        case 24:
            return new Tuple24();
        case 25:
            return new Tuple25();
        default:
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
}

查看源码我们看到Tuple0一直到Tuple25

我们看flink为我们为我们构造好了0-25个字段的模板类

ackage org.apache.flink.api.java.tuple;
​
import java.io.ObjectStreamException;
import org.apache.flink.annotation.Public;
​
@Public
public class Tuple0 extends Tuple {
    private static final long serialVersionUID = 1L;
    public static final Tuple0 INSTANCE = new Tuple0();
​
    public Tuple0() {
    }
​
    public int getArity() {
        return 0;
    }
​
    public <T> T getField(int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public <T> void setField(T value, int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public Tuple0 copy() {
        return new Tuple0();
    }
​
    public String toString() {
        return "()";
    }
​
    public boolean equals(Object o) {
        return this == o || o instanceof Tuple0;
    }
​
    public int hashCode() {
        return 0;
    }
​
    private Object readResolve() throws ObjectStreamException {
        return INSTANCE;
    }
}

三、Tuple的使用

方式一:初始化元组

可使用静态方法 newInstance进行元组构造 指定元组空间大小;

ex: 1 则元组只有一个空间,则实际使用的Tuple1 字段只有f0

ex: 12 则元组只有两个空间,则实际使用的Tuple2 字段只有f0,f1

指定  Tuple元组空间大小 (可理解为字段个数)

Tuple tuple = Tuple.newInstance(1);

方式一:构造元组 

使用Tuple.newInstance(xx),指定元组空间大小的话,这样存取虽然能够实现,但会存在存储索引位置使用不正确的情况,可能由于失误操作编写出索引越界异常,而且使用不太方便,使用Tuplex.of(数据)方法构造Tuple元组

Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2");
System.out.println(tuple3.f0); // test0
System.out.println(tuple3.f1); // test1
System.out.println(tuple3.f2); // test2

四、Flink之POJO类型

Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:

1.是公共类;

 

2.无参构造是公共的;

3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);

4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。

Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。

public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
    DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
    wordCounts.keyBy("word");

五、Flink之基本类型

Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double。

六、Flink之通用类型

Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。

所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

七、Flink之值类型Values

通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。

Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。

八、Flink之Hadoop的Writable类

它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。

九、Flink之特殊类型

Flink比较特殊的类型有以下两种:

1.Scala的 Either、Option和Try。

2.Java ApI有自己的Either实现。

Java Api 与 Scala 的 类似Either,它表示两种可能类型的值,Left或Right。Either对于错误处理或需要输出两种不同类型的记录的运算符很有用。

类型擦除和类型推理

Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。

例如,在JVM中,DataStream<String>和DataStream<Long>的实例看起来是相同的。

List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());

泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。Flink 的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://blog.csdn.net/test_test111/article/details/117674481
相关文章
  • SpringBoot自定义错误处理逻辑介绍

    SpringBoot自定义错误处理逻辑介绍
    1. 自定义错误页面 将自定义错误页面放在 templates 的 error 文件夹下,SpringBoot 精确匹配错误信息,使用 4xx.html 或者 5xx.html 页面可以打印错误
  • Java实现手写一个线程池的代码

    Java实现手写一个线程池的代码
    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和
  • Java实现断点续传功能的代码

    Java实现断点续传功能的代码
    题目实现:网络资源的断点续传功能。 二、解题思路 获取要下载的资源网址 显示网络资源的大小 上次读取到的字节位置以及未读取的字节
  • 你可知HashMap为什么是线程不安全的
    HashMap 的线程不安全 HashMap 的线程不安全主要体现在下面两个方面 在 jdk 1.7 中,当并发执行扩容操作时会造成环形链和数据丢失的情况 在
  • ArrayList的动态扩容机制的介绍

    ArrayList的动态扩容机制的介绍
    对于 ArrayList 的动态扩容机制想必大家都听说过,之前的文章中也谈到过,不过由于时间久远,早已忘却。 所以利用这篇文章做做笔记,加
  • JVM基础之字节码的增强技术介绍

    JVM基础之字节码的增强技术介绍
    字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础。字节码增强技术就是一类对现有字
  • Java中的字节码增强技术

    Java中的字节码增强技术
    1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术。 参考地址 2.常见技术 技术分类 类
  • Redis BloomFilter布隆过滤器原理与实现

    Redis BloomFilter布隆过滤器原理与实现
    Bloom Filter 概念 布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射
  • Java C++算法题解leetcode801使序列递增的最小交换次

    Java C++算法题解leetcode801使序列递增的最小交换次
    题目要求 思路:状态机DP 实现一:状态机 Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Solution { public int minSwap(int[] nums1, int[] nums2) { int n
  • Mybatis结果集映射与生命周期介绍

    Mybatis结果集映射与生命周期介绍
    一、ResultMap结果集映射 1、设计思想 对简单的语句做到零配置,对于复杂一点的语句,只需要描述语句之间的关系就行了 2、resultMap的应用场
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计