Skip to content

Latest commit

 

History

History
69 lines (54 loc) · 7.61 KB

statestore.md

File metadata and controls

69 lines (54 loc) · 7.61 KB

State存储

Flink支持有状态的计算,其支持的状态又分为两种:Operator State和Keyed Sate,前者没有当前key的概念,而后者总有一个current key 与之对应。此外,前者只能存放在堆中,而后者既可以存放在堆中,也可以存放在堆外。Flink的Keyed Sate是由Key Group来组织的,并分布在 Flink算子的哥哥并发实例上,每个算子上的Key Group个数即为最大并发数。

当使用MemoryStateBackend和FsStateBackend时,默认情况下会将状态数据保存到CopyOnWriteStateTable中,它是StateTable 接口的一个实现,其中可以保存多个KeyGroup的状态,每个KeyGroup对应一个CopyOnWriteStateMap。

CopyOnWriteStateMap的结构类似于HashMap,但是它相比于HashMap支持了两个特别有意思的功能:

  • 哈希结构为了保证数据读写的效率都会有宇哥扩容策略,CopyOnWriteStateMap采用的是渐进式rehash策略,它不会一次性将所有数 据都迁移到新的hash表,二是会逐渐的将数据迁移过去;

  • 支持checkpoint时的异步快照,可以在快照的同时对其中的数据执行修改操作,并能同时保证快照数据的准确性;

MemoryStateBackend和FsStateBackend的KeyedStateBackend都使用HeapKeyedStateBackend存储数据,HeapKeyedStateBackend 持有Map<String, StateTable<K, ?, ?>> registeredKVStates来存储StateName与具体State的映射关系。registeredKVStates的 key就是StateName,value是具体的State数据,value存储在StateTable中。

StateTable有两种实现:

  • CopyOnWriteStateTable属于Flink定制化的数据结构,在进行checkpoint时支持异步快照;
  • NestedMapStateTable直接使用嵌套的双层HashMap存储数据,在进行checkpoint时只能进行同步快照;

下面主要就CopyOnWriteStateTable类进行介绍。在StateTable中持有StateMap[] keyGroupedStateMaps真正的存储数据。StateTable 会为每个KeyGroup的数据都初始化一个StateMap来对KeyGroup做数据隔离。在对状态进行操作时,它会先根据Key找到对应的KeyGroup,从而拿 到相应的StateMap,这样才能对状态进行操作。而在CopyOnWriteStateTable中就使用CopyOnWriteStateMap存储数据,这是一个数组+链表构 成的Hash表,其中的数据类型都是StateMapEntry。Hash表的第一层是一个StateMapEntry类型的数组,也就是StateMapEntry[]。在StateMapEntry 类中有一个StateMapEntry next指针构成的链表。

先来介绍下CopyOnWriteStateMap类的渐进式rehash策略,它其中有一个hash表堆外提供服务,但是如果表中的元素太多需要扩容时,就需要将数 据迁移到一个容量更大的hash表中去。在Java的HashMap扩容时,会将旧Hash表中的所有数据一次性的都移动到大Hash表中,这样的策略存在一定的 问题:如果当前HashMap中已经存储了1G的数据,那么就需要将1G的数据一次迁移完成,这个过程可能会比较耗时。而CopyOnWriteStateMap在扩容 时,则不会一次将数据全部迁移,而是在每次操作它时慢慢的将数据迁移到大的Hash表中。

具体说来,就是在内存中有两个Hash表,一个是PrimaryTable作为主桶,一个是RehashTable作为扩容期间用的桶,初始阶段只有PrimaryTable, 当PrimaryTable中的元素个数大于设定的阈值时,就要开始扩容了。在putEntry()方法中判断size()是否大于threshold,若是则调用doubleCapacity() 方法申请新的Hash表赋值给RehashTable。渐进式rehash策略由于会逐渐的迁移数据,因此一定会涉及到选桶操作,它需要决定是使用PrimaryTable 还是使用RehashTable:它首先会根据HashCode按位与PrimaryTable的大小减去1的值,从而计算出应该将当前HashCode分配到PrimaryTable的 哪个桶中去,如果桶编号大于等于已迁移的桶编号rehashIndex(该桶编号用于标记当前rehash的迁移进度,它之前的数据已经从PrimaryTable迁移到 了RehashTable的桶中),则应该去PrimaryTable中去查找,否则应该去RehashTable中去查找。每次get()、put()、ContainsKey()、remove() 操作时,都将会调用computeHashForOperationAndDoIncrementalRehash()方法触发迁移操作,这个方法用于检测是否处于rehash过程中,如果是 就会调用incrementalRehash()方法迁移一波数据,同时它还会计算key和namespace对应的hashCode。

下面就重点分析一下incrementalRehash()方法的实现,该函数会先定义两个StateMapEntry类型的结构用于存放待迁移的元素和迁移后的元素,并记录 下本次迁移了多少个元素,并且它每次迁移会确保至少迁移了MIN_TRANSFERRED_PER_INCREMENTAL_REHASH(默认值是4)个元素到新桶,它会循环遍历原 Map的第rhIdx个桶,从前往后开始遍历,只要该桶中仍有元素,就需要继续遍历,这样才能保证每次迁移都会将桶中的元素迁移完全,而不是某个桶迁移到一半, 之所以会需要这样的保证,是因为渐进式reHash的过程中,元素依然需要被访问,而确定访问旧桶或是新桶的办法就是根据桶的编号,因此必须保证桶的迁移是 完全的,否则就无法确定某个元素应该是去旧桶中会新桶中去找了。如果在迁移过程中遇到版本比highestRequiredSnapshotVersion小的元素,则需要拷 贝一份,这样通过CopyOnWrite的方式保证元素迁移过程中依然能够被访问,将下一个需要迁移的节点保存,并采用头插法将当前元素迁移到新的table的链表 头部,链表指针后移以便进行下一个元素的迁移,同时将已迁移元素自增1。如果循环结束表示rhIdx之前的桶已经被迁移完成,如果rhIdx与原待迁移桶总数相 等则表示已经迁移完成,此时将新的Hash表做为主表并回收原表即可。

那么既然StateMap保存的是KeyGroup的状态数据,那么就同样需要对其进行快照,传统的办法就是将其中的全量数据深拷贝一份,然后对拷贝来的这份数据做 快照,而原数据依然可以堆外提供服务。但是这种办法的效率比较低,如果状态数据较大,那么将会耗费较长的时间,为了提高效率,Flink采用的办法是对其中 的数据做浅拷贝。什么是浅拷贝呢?就是只拷贝引用,而不拷贝数据。

如果StateMap不是正在进行扩容,则其Snapshot的流程比较简单,就是创建一个新的snapshotData,然后直接将primaryTable中的数据拷贝到snapshotData 中即可。如果StateMap正在进行扩容,Snapshot的流程就相对复杂一点,它需要先创建一个新的snapshotData,然后将primaryTable和rehashTable 中的数据都拷贝到snapshotData中。

那么如果StateMap当前正在进行扩容,则其Snapshot的流程就会相对来说复杂一些。需要先创建一个新的snapshotData,将primaryTable和rehashTable 的数据都拷贝到snapshotData中,snapshotData数组的长度并不等于primaryTable的长度+rehashTable的长度,而是分别计算primaryTable和rehashTable 中有几个桶中有数据,然后其桶数量为有数据的桶数量之和。

浅拷贝的具体流程如下:首先调用CopyOnWriteStateTable类的stateSnapshot()方法对整个StateTable进行快照,CopyOnWriteStateTable中为每个KeyGroup 维护了一个StateMap到KeyGroupedStateMaps中,其stateSnapshot()方法会创建CopyOnWriteStateTableSnapshot类的对象,而在CopyOnWriteStateTableSnapshot 类的构造函数中会调用CopyOnWriteStateTable的getStateMapSnapshotList()方法,该方法会调用所有的CopyOnWriteStateMap的stateSnapshot()方 法来生成CopyOnWriteStateMapSnapshot并保存到list中。