「书籍阅读」分布式计算系统(二)
这种框的部分一般是自己衍生出来的疑问与解答(当然有的问题可能暂时没有解答)
(ps:本文只是粗略记录,想了解具体细节还是建议要看原书)
流计算系统Storm
与批处理系统处理静态数据不同,这种大量、快速、时变并持续到达的数据称为流数据(stream data),针对流数据的计算称为流计算。实时性需求是流计算的显著特征,需要设计相应的流计算系统。
设计思想
Storm流计算系统需要处理的数据以流的形式存在,理论上是无界,并且计算要持续进行。Storm系统将流数据抽象为无界的元组序列,并使用拓扑抽象计算过程。
连续处理
批处理系统在获取全部要处理的数据时,就开始计算这批数据,计算结束后就自动结束进程。而流计算系统的输入数据是无界的,任务本身需要长期运行在计算节点上,简单来说,就是负责执行计算任务的现成或进程需要长期驻留在系统中。
连续处理是执行流计算的一种直观的方式,输入的流数据记录不断地进入系统,计算任务长期驻留在计算节点并且更新自身的状态。其中,状态是一种特殊的数据,用于保存从流计算开始到目前为止得到的计算结果,对于流计算系统而言非常重要。
数据模型
Storm将流数据看做一个无界的、连续的元组序列。该序列中的元组类似于关系数据库中的元组,一个元组即为系统处理的一条记录,每条记录包含若干字段。字段类似于关系数据库中的属性,可以是基本类型,字符类型、字节数组等。如下图:
计算模型
Storm使用拓扑抽象描述计算过程,拓扑是由Spout和Bolt组成的网络,在逻辑上是一个有向无环图。顶点为Spout或Bolt,用于描述数据处理逻辑,并且定义了输入和输出元组的模式。
通常Spout是流数据的源头,负责从数据源不断地读取数据,然后封装成元组形式发送给Bolt。Bolt描述针对流数据的转换过程,其内封装了消息处理逻辑,负责将接收的流数据转换为新的流数据。
Storm在物理设计时的主要思想是利用分布式架构降低数据处理的延迟,因而拓扑中的Bolt在物理上由若干个任务实现,如下图所示:
这种使用拓扑的抽象描述有什么好处呢?拓扑的抽象描述在直观的感觉上就是一个有向图,图中的节点只有两种结构Spout和Bolt,由于流计算的输入是不断进行的,在每一个节点的计算都是不断进行的,于是使用拓扑的方式,解释起来会更加轻松。因为,可以将一连串的输入看成一条线,然后在图中的节点进行传递,根据节点中相连的边,可以更直观地感受流数据的流动方向。
体系架构
架构图
Storm也是采用"主从"架构,主、从节点之间的协调和控制依赖于ZooKeeper,具体架构图如下:
- Nimbus:主节点运行的后台程序,充当管理员的角色。存储用户提交的Topology程序代码,并负责分发代码,分配任务和检测故障等。
- Supervisor:从节点运行的后台程序,负责监听所在机器的工作,根据Nimbus分配的任务决定启动或停止Worker进程,一个从节点上同时运行若干Worker进程。
- ZooKeeper:负责Nimbus和Supervisor之间的协调工作。Supervisor的地址以及作业的元信息均存储于ZooKeeper中。若Nimbus进程或Supervisor进程以外终止,则重启是需要读取ZooKeeper的数据进行恢复。
- Worker:负责任务运行的进程,进程内部运行一个或多个Executor线程,从而实际执行任务。
Storm与MapReduce、Spark对比
系统 | MapReduce | Storm | Spark |
---|---|---|---|
系统进程 | JobTracker | Nimbus | Master |
系统进程 | TaskTracker | Supervisor | Worker |
系统进程 | Child | Worker | CoarseGrainedExecutorBackend |
工作线程 | - | Executor | Task |
任务代码 | Task | Task | Task |
基础接口 | Map/Reduce | Spout/Blot | RDD API |
应用程序执行流程
- 用户编写程序,经过序列化、打包并提交给主节点Nimbus
- Nimbus创建一个组件与物理节点的对应关系文件,并将该文件原子地写入ZooKeeper中的某一Znode
- 所有的Supervisor监听ZooKeeper中的Znode以得到通知,从而获取所在节点需要执行的组件任务。
- Supervisor从Nimbus处拉取可执行的代码
- Supervisor启动若干Worker进程执行具体的任务
- Worker进程根据从ZooKeeper中获取的文件信息启动若干Executor线程,该线程负责执行组件所描述的任务。
Executor线程执行同一个组件的一个或多个任务,但通常只执行一个任务,如果执行多个任务,这些任务必然属于同一个组件,并且由Executor线程串行执行。Spout和Bolt组件的Task数量是不变的,因此对Topology中组件并行度的动态调整实际上是通过改变Executor的数量完成的。
工作原理
Storm系统在数据输入、转换以及输出的过程中,由Spout负责数据的输入,而Bolt负责数据的转化以及输出。不同组件之间是通过元组传输进行的,为了完成元组传输需要解决如下问题:
- 对流数据来说,上游组件的任务发送哪些元组给下游组件的任务
- 对于一条元组来说,上游组件的任务如何向下游组件的任务传递
Storm通过流数据分组策略(steam grouping)确定元组传输的方式,且发送的过程一次只传递一条元组。
Spark Streaming中有一个逆函数的机制,可以减少窗口计算时的重复计算。而Storm中好像是没有这样的一个机制的。其实可以像Flink一样添加一个Acker进程,用于在对应的节点记录计算状态。同时Storm只有Bolt和Spout两种接口,和MapReduce有着相似的弊病,就是用于描述数据计算的API过少,造成了用户需要手动编写大量的计算处理代码,使用比较麻烦。
流数据分组策略
流数据分组策略定义了两个存在订阅关系的组件之间进行元组传输的方式。常见流数据分组策略如下:
- 随机分组(shuffle grouping):随机分发元组,保证下游组件任务接收元组的数量大致相当
- 按字段分组(fields grouping):保证指定字段内容相同的元组分配到同一个任务中
- 部分按字段分组(partial key grouping):与按字段分组类似,但当数据倾斜时下游组件的任务之间会进行负载均衡。
- 广播分组(all grouping):每个任务均会收到所有的元组
- 全局分组(global grouping):所有的元组均发送到同一个任务中
- 不分组(non grouping):目前和随机分组一致
- 直接分组(direct grouping):允许上游组件直接指定其发出的元组由下游组件的哪个任务接收并处理
- 本地或随机分组(local or shuffle grouping):入股下游组件有一个或多个任务与上游组件的任务处于同一个Worker进程中,则元组将被随机分发到该进程的任务中。
为什么要使用分组策略?分组策略可以有效地指定流数据中的数据将流向哪一个计算节点,同时按照不同的分组策略使其满足一定的条件。在一些情况下,如果不同类型的数据需要进行不同的处理,则可以通过分组策略,将数据进行划分,交给不同的算子进行处理。
元组传递方式
Storm采用一次一元组(一次一记录)的消息传递机制,一档上游组件的任务处理完毕一条元组,则立即发送给下游组件的任务,并且一次发送一条元组。这种立即发送的消息传递机制有利于减少处理的延迟,从而满足实时性需求。
容错机制
- 主节点的Nimbus发生故障:系统无法接受新的作业,但运行当前作业的从节点Supervisor和Worker仍可以继续工作。
- 如果某个Worker发生故障,则所在节点的Supervisor可以试图重启Worker进程
- 但Nimbus无法正常工作,因此无法将该Worker调度到其他节点并重新启动
- 为了保证高可用性,Storm可以配置一个Nimbus列表,以其中一节点为主Nimbus,其余节点作为备Nimbus,元信息存储在所有主、备Nimbus或可靠的外部分布式存储系统中。一旦Nimbus发生故障,则系统将从备Nimbus中选择一个作为新的Nimbus
- 仅Supervisor出现故障:Worker进程会继续执行,利用外部监控程序判断该节点能够重启Supervisor,如果可以就重启,否则Nimbus将在其他节点启动新的Supervisor进程,所有受其监控的Worker进程均需重新调度。
- 仅Worker出现故障:Supervisor会试图重启Worker。若无法成功重启,Nimbus将命令其他节点的Supervisor启动Worker进程。
- Worker和Supervisor同时故障:Nimbus将命令其他节点的Supervisor启动Worker进程
容错语义
Worker中的Executor线程负责执行处理组件的任务,一旦Worker发生故障即表示进入节点的信息可能由Executor线程负责的Task处理,也可能未经处理即随着故障的发生而丢失。因此,容错保障还需要考虑语义的正确性,理想情况下,可靠的容错保障应该使得流计算系统正常运行所处理的消息与发生故障重启后处理的消息完全一致,通常有以下3个级别:
- 至多一次(at most once):消息可能会丢失
- 至少一次(at least once):消息不会丢失,但可能会重复
- 准确一次(exactly once):消息不丢失,不重复
仅重启Worker,则容错语义的级别为至多一次。原生Storm系统将Spout发出的每一条元组及其后续衍生得到的元组视为一棵元组树,如果该元组树的所有元组均由系统成功输出,则表示源元组得到成功处理。Storm使用ACK机制对元组树中的元组进行确认,一旦元组树中的某一元组因故障而无法得到确认,则系统从Spout将元组进行重放。
因为Storm执行过程中,会出现因只有一个元组没得到确认而全部重放的情况,所以这种方式达到至少一次的容错语义
元组树
Spout中的每条元组对应一棵元组树。在Storm中,Spout发出元组时用户可以为其指定标识,称为Spout-Tuple-id(STid),STid可以是任意对象,在后续Bolt处理中,该标识会传递给新的元组。如果Spout中的多个元组共用一个STid,则在逻辑上将这些元组及其衍生元组构成一棵元组树。
如下图所示,椭圆形表示Tuple1,并将STid1绑定在Tuple1上面,Tuple1会随着拓扑结构流向Bolt E中。值得注意的是,如果Tuple1在拓扑中经过其他Bolt节点产生了新的Tuple,也将这些Tuple绑上STid1。只有在拓扑结构的最终节点处理完对应STid1的数据后,Tuple1才算处理完成。当然也可以在Spout中给多个Tuple1绑上相同的STid,但只有所有的Tuple都在拓扑结构中处理完成,才算结束,如果其中一部分Tuple没有处理成功,则这一部分的Tuple都得重新再来。
ACK机制
在Storm中存在一类特殊的任务,叫做Acker,负责跟踪Spout发出的元组及其元组树。Acker的数量的默认值为1,用户编程是可以设置拓扑中的Acker数量。
元组树中的元组传输在物理上表现为组件的任务之间的消息传输,该消息包含一个64位标识,称为Mid(里面包含有STid)。如果正常传输,上游组件的任务发送消息的Mid和下游组件的任务接收消息的Mid相同,同时任务发送消息和任务接收消息时,都会向Acker进行报告。在Acker中会对报告的Mid和STid进行记录,其维护类似<STid,ack_val>
的映射表。当收到Spout发送的消息时将相应的STid的ack_val初始化为0。然后在后面计算过程中,不论是收到任务发送消息的Mid还是收到任务接收消息的Mid都会将其STid与映射表中STid对应的ack_val做异或操作。如果ack_val为0,则Acker会告知相应的Spout,表示对应的STid已经处理完毕。
对于设置了多个Acker的情况,Storm会使用一致性哈希将一个STid对应到Acker上,从而使多个Acker之间不会互相干扰。
有没有ack_val为0,但是元组树没有处理完的情况?有可能,比如Spout A 发送Mid1给Bolt B并向Acker汇报,然后Bolt B接收Spuout发来的Mid并向Acker汇报,这时候对应STid的ack_val正好为0,但元组树并未执行完成。然而,系统中消息接收和发送是并发进行的,因此发生这种情况的可能性很低。
消息重放
若Acker在设定时间范围内收到处于拓扑末端Bolt的报告并且STid对应的ack_val为0,则Storm将任务系统成功处理了STid的元组树。如果Worker进程发生故障,导致没能出现正确的情况,就需要Spout重新发送以STid为标识的元组,但是该消息重放机制可能会导致消息的重复计算,实现了至少一次的容错语义。
举个例子,如下图,Bolt C发生了故障,此时Sotrm认为消息传输出了故障,需要Spout重新发送以STid为标识的元组,因此Mid1、Mid2、Mid4为表示的消息中的元组因重放而被系统计算了两次,所以也达成了至少一次的容错语义。
流计算系统Spark Streaming
Spark Streaming 能够将连续的流数据进行离散化后交给Spark批处理系统。
设计思想
微批处理
**微批处理方式对批处理系统进行改造,将流计算作业转化为一小组微小的批处理作业。批处理系统能够较快地执行这些微小的批处理作业,从而满足流计算低延迟的需求。**在未批处理方式中,批处理引擎处理完毕一批数据后,负责x处理该批数据的任务即结束,而不是像Storm的连续处理方式一样长期驻留,这是微批处理和连续处理的重要区别。
数据模型
Storm采用连续处理方式,其数据模型将流数据看作一系列连续的元组。Spark Streaming采用微批处理方式,将连续的流数据进行切片(离散化),生成一系列小块数据。值得注意的是,流数据经过离散化后得到的每一个小批量数据都是一个独立的RDD,一个RDD通常包括若干分区。将流数据离散化出的一组组RDD序列抽象为离散化的流(discretized stream, DStream)。
计算模型
输入DStream由一组RDD序列构成,这些RDD都会得到相同的处理,与Spark批处理相比,Spark Streaming与Spark拥有几乎相同的API(map, reduceByKeyAndWindow等),其只是在处理流数据时进行了微批处理,使用DStream作为抽象数据,使用与Spark几乎相同的API,也有着基于Operator DAG和Lineage的逻辑计算结构和物理计算结构。与Spark不同的是,Spark Streaming在每一个时间处理的微批数据是不同的,而Spark则是读入所有数据后直接进行处理。
举个例子,加入输入数据以流形式发送文本语句,目标为统计过去2s内的单词即词频,每隔1s统计一次。从Operator DAG角度看待这个问题的逻辑计算模型和物理计算模型如下图:
从DStream Lineage角度来看到这个问题的计算模型和物理计算模型如下图。
在ReduceByKeyAndWindows
和DStream C
中,不同批次间的RDD同样存在一条连线,这说明某些算子在语义上是跨批次的。
体系架构
架构图
从物理架构上看,Spark Streaming和Spark相同,不同的是Spark Streaming对驱动器和执行器部件进行了拓充。
- 驱动器:StreamingContext对SparkContext进行了拓充,构造了StreamingContext,包含用于管理流计算的元数据
- 执行器:负责运行任务以执行相应的算子操作,其中作为接收者(Receiver)的某些任务负责从外部数据源持续获取流数据,这和Spark批处理引擎中读取输入数据的方式不同。
应用程序执行流程
- 启动驱动器。以Standalone为例,集群管理器由Master和Worker构成。如果使用Client部署方式,则客户端直接启动驱动器,并向Master注册。如果使用Cluster部署方式,则客户端将应用程序提交给Master,由Master选择一个工作节点启动驱动器进程(DriverWrapper)
- 由驱动器创建StreamingContext,向集群管理器进行资源申请,并由驱动器进行任务分配和监控。
- 集群管理器令工作节点启动执行器进程,该进程内部以多线程的方式运行
- 执行器进程向驱动器进行注册
- StreamingContext根据DStream的Opeator DAG,生成关于RDD转换的Operator DAG,从而将其交给执行器进程中的线程以执行任务。
工作原理
Spark Streaming将数据流分解为一系列短小的批数据,交给底层的Spark批处理引擎。在Spark Streaming中需要将DStream的转化操作转化为Spark中对RDD的转换操作,生成关于RDD操作的DAG。简单来说,就是Spark Steaming执行包括两个部分,一个部分是将输入的流数据分解成一批一批的RDD,第二部分是将用户对DStream转化的描述,转化为对RDD操作的Spark转化描述。再由生成的对RDD的转化执行在第一部分中产生的一批批RDD中。
为什么要对DStream的操描述转化成对RDD操作?Spark Streaming本质上还是使用Spark的方式进行计算,但是其采用微批处理的方式,实现了流计算。DStream的操作描述是为流计算设计的,而不是为Spark设计的,因此要使用Spark完成相应的计算,还是需要将DStream的操作描述,转换为RDD的操作,才能交给Spark进行计算。同时流数据在Spark Streaming中,会被分成一批批的RDD,DStream的操作描述也不能直接作用于RDD,只有Spark中的操作,才能直接作用于RDD。
数据输入
对于一个Spark Streaming应用程序来说,其输入数据可以来自一个或多个流。系统接收数据的方式有两种一种:
- 从外部数据源直接获取数据:从socket端口获取网络数据或接收外部传感器产生的数据。
- Spark Streaming确保新输入的数据在两个工作节点得到备份后才会向客户端发送确认信息,旨在支持系统容错。
- 从外部的存储系统周期性地得去数据:其他应用系统将其日志存入HDFS等分布式存储系统中。Spark Streaming从其中周期性地读取日志数据。
- 虽然HDFS中的数据一般用于批处理,但是依然能够以流的方式读取此类数据。
数据转换
Spark Streaming中的操作可以分为四类:类似RDD转换的操作,使用RDD转换的操作,窗口操作和状态操作。
比如Spark Streaming中的map操作,从执行结果上他与Spark的map操作相同,但是在实际执行过程中,DStreaming的每个批次都是一个RDD,Spark Streaming中的map操作需要转换成一个或多个Spark的RDD操作,这样才能将Spark Streaming的实际计算过程交给底层的Spark框架(DStream转化的Operator DAG转变为描述RDD转换的Operator DAG)。
窗口是流计算中常用的操作,由于流计算中的输入数据理论上是无界的,窗口可以为流动的数据指定一定的计算范围,并且每隔一定间隔指定一次。Spark Streaming支持以时间为单位的窗口操作。(因为微批处理是按照时间进行分批的,所以对应的时间窗口相对好操作)
根据窗口大小和间隔之间的关系,可以将窗口分为下面3类:
- 滑动窗口(sliding window):窗口大小大于间隔
- 固定窗口(fixed window):窗口大小等于间隔
- 跳跃窗口(tumbling window):窗口大小小于间隔
在Spark Streaming的reduceByKeyAndWindows提供了使用逆函数的API,举个例子,当进行两个时刻的窗口计算时,后一时刻的计算结果与前一时刻的计算结果存在比较大的重合部分,于是就可以定义逆函数利用前一时刻计算结果进行少量操作得到后一时刻的计算结果(而不是在后一时刻重新计算所有数据),通过这种方式实现了增量式计算,提高了计算效率。下图为区间计数的例子:
状态是流计算系统与批处理系统存在显著差异之处。如果某一操作保存的数据将在新数据到达后进行的计算中重新使用,则保存的数据即为状态。在直接提到的计算重合部分,就需要在对应的windows中进行局部统计并保存,这个“局部”统计的结果就是状态,在Spark Streaming中以RDD的形式存在。对于每一小批数据在计算过程中产生的RDD,可将其中参与后续批次计算的RDD视为状态。DStream中涉及到多个小批次数据的转换操作称为有状态的操作,反之则为无状态的操作。Spark Streaming也提供了一种特殊的,直接针对状态进行转换的操作updateStateByKey
,本质上是针对状态RDD进行用户定义的转换操作。
数据输出
Spark在遇到动作操作时就会触发DAG的生成,但是Spark Streaming中并没有行动操作的概念,而是遇到输出操作就生成DAG,这些DAG是已经转换成了Spark中的RDD操作的DAG,因此是可以交给Spark框架进行执行的。值得注意的是在Spark Streaming中可以由多个count(在Spark中是动作操作),但是在执行翻译好的DAG时却不会执行动作操作,这是因为在翻译过程中Spark Streaming将count转换操作转化成了其他的RDD操作。
容错机制
- 集群管理器故障:导致系统无法正常工作,需要重新启动或借助ZooKeeper实现高可用性
- 客户端故障:如果其没有和驱动器运行在一个进程中,则只要作业成功提交给系统则不会影响系统中作业的运行。
基于RDD Lineage容错
执行器发生故障,并且执行器不包含Receiver任务,则表示只有负责数据处理的任务受到了影响。因此执行器中运行的任务实际为底层Spark批处理引擎的任务,这种情况下可以使用Spark批处理引擎的容错机制进行恢复(即使用数据根据RDD的Lineage进行重新计算)。
基于日志的容错
基于日志的容错策略可以解决含有Receiver的执行器故障的问题。如果执行器和Receiver同时故障,则只能从外部存储中重新获取数据,但是这时候Receiver经过重启之后,又会接收新的数据,这时候就存在应该获取那些数据以及是否存在重复读取的数据等问题。
因此Spark Streaming的Receiver需要使用日记记录已经获取的数据,Receiver接收数据后除将其存入本地执行器的内存外,还会以日志的新式存入外部的文件系统。作业中可能运行多个Receiver,所有Reciever向驱动器汇报自己读取的数据信息,驱动器同样将这些信息写入日志,从而掌握整个作业读取到的数据情况:
当执行器发生故障重启后,Receiver从外部文件系统加载日志并重新读取输入数据,确保不重复读取日志中已经存在的数据。
基于检查点的容错
基于RDD Lineage的容错可以结合检查点使用避免重复计算。该类检查点称为数据检查点,本质也是RDD检查点。
除数据检查点外,为了支持驱动器的故障恢复,Spark Streaming还需设置元组数据检查点。元数据检查点包含以下内容:
- 配置信息:创建Spark Streaming应用程序的配置信息
- DStream操作信息:定义应用程序计算逻辑的DStream操作的信息
- 未处理的批次数据的信息:正在排队尚未处理的批次数据的信息
驱动器将检查点写入可靠的外部文件系统,当发生驱动器故障并重启后,驱动器从外部文件系统加载元数据检查点以及日志,根据这些信息继续进行计算。
在Spark Streaming的检查点中,包含数据检查点和元数据检查点。数据检查点旨在加快执行器发生故障后的回复过程,元数据检查点旨在保证驱动器能够从故障中回复到正常状态。
端到端的容错语义
Spark Streaming的容错机制能够保证准确一次的容错语义。但还要考虑提供数据源和接收处理结果的问题,如果提供数据源的系统无法支持数据的重放,那么即便Spark Streaming中的Receiver因故障重启,也无法重新获取丢失的数据。因此,对于整个流计算系统,还需提供数据源和接收处理结果的系统能够支持准确一次的容错语义。
批流融合基础
批流融合的背景
批处理适用于处理大批量数据、对实时性要求不高的场景。
流计算系统适用于处理快速产生的数据,对实时性要求较高的场景。
存在一些应用场景需要同时使用批处理和流数据,比如微博热门话题的统计,对于已有的微博需要使用批处理的方式进行统计,而最新产生的微博则需要通过流计算进行统计,此外,新产生的部分经过一段时间后,也需要使用批处理进行处理。因此,批流融合的应用场景也是比较常见的。
Lambda架构及其局限性
Lambda架构认为数据处理系统由数据和查询两部分组成,理想状态下的表达式为:
即针对所有数据的查询可以得到正确的结果。但通常数据量都很大,查询很难得到快速响应,实时查询也需要消耗大量的资源。
一种解决方案就是针对查询进行预计算,所得的结果称为批处理视图(batch view)。当需要执行查询操作时,则可以从批处理视图中读取结果。查询过程的表达式变为:
但这个思路还忽略了一个重要的问题:数据往往是快速,动态增加的,因而批处理视图的结果存在一定的滞后性。为了解决这个问题,Lambda架构将数据处理系统分为以下3个层次:
- 批处理层(batch layer):离线批处理数据,生成批处理视图
- 加速层(speed layer):实时处理新数据,增量补偿批处理视图
- 服务层(serving layer):响应用户的查询请求
实时获取所有数据并进行查询通常也是比较难实现的,因此Lambda架构将所有数据视为主数据集(master dataset)和新数据(new data)的组合。主数据主要用于批处理层,生成批处理视图,但是在接收到新数据时,批处理层又要重新计算,并且等到批处理计算完成之后,服务层的批处理视图才会更新,这就导致服务层查询的数据并不是最新的结果。于是引入了加速层,其只处理最新的数据,并生成实时视图(real-time view)以补偿批处理视图的不足,加速层在接收到新数据时会不断地更新视图,使用增量算法对批处理视图陈旧的结果进行补偿,从而降低查询延迟。所以Lambda的最终架构可以使用下面的表达式表示:
Lambda架构可以融合批处理和流计算两类系统,在一定程度上平衡了重新计算和延迟之间的矛盾,但是,在该架也存在明显的缺点:
- Lambda架构的开发人需要将所有算法那在批处理层和加速层实现两次。并且要求查询结果为两个系统结果的合并,大大增加了开发的复杂度。
- Lambda架构的运维人员需要同时维护批处理和流计算两套执行引擎,增加了运维的复杂度。
批处理与流计算的统一性
这里先要引入有界数据和无界数据的概念
有界数据:表示系统处理的数据有限
无界数据:表示系统可以处理无限的数据
批处理/流计算系统表示底层特定的执行引擎,在实际应用中,无界数据集可以使用批处理系统反复调度处理,经过良好设计的流计算系统也可以完美地处理有界数据集。
窗口操作
窗口操作可以将数据集切分为有限的数据片,以便针对该数据片进行处理。对于无界数据,诸如聚合等操作需要使用窗口定义边界,映射和过滤则不需要。对于有界数据来说,窗口是可选的,或者说默认有一个窗口包含了有界数据的所有数据。
从基于的要素来看,窗口可以分为基于时间的窗口和基于记录数的窗口。按照窗口大小和间隔之间的关系,可以将窗口分为滑动窗口,固定窗口和跳跃窗口。还有一种较为特殊的基于时间的会话窗口(session windows),一般按照超时时间定义,任何在超时时间内的记录均视为属于同一个会话。
时间域
对于一条记录来说,其涉及事件时间和处理时间
- 事件时间是指产生该记录的事件实际发生的时间(记录产生时其所在系统的当前时间)
- 处理时间是指在系统执行数据处理的过程中,一条记录被数据处理系统观察到的时间(该记录被数据处理系统处理时数据处理系统的当前时间)
一条记录的事件时间永远不变,但处理时间随着记录在系统中各个物理节点的流动而持续变化。基于时间的时间窗在计算过程中到了对应的计算时间仍然拿不到完整的数据,举个例子,比如统计[12:00:00,12:01:00]
的事件时间记录,当系统时间到达12:01:00
时,仍然可能有12:00:54
的记录并传递到系统中,因此此时系统处理的这个时间时间窗处理的数据并不完整。
从上面的描述中可知,记录的事件时间总是和记录时间存在一定偏差值。对于有界数据而言,因为总是能获取所有数据,所以偏差并没有太大的影响;而对于无界数据而言,由于无法获取所有的数据,如果无法判断系统当前处理时间与记录时间时间之间存在的差值,则将直接影响应用的系统。
为了衡量处理时间和事件时间之间的差值,需要引入水位线的概念。其是一个时间时间戳,指示的事件时间表示早于该时间时间的记录已经完全被系统观测,因此系统可根据水位线“认定”当前时间时间域所处的时间。下图为水位线的示例,立项状态下的水位线应该是一条斜率为1的直线。点状虚线为实际的水位线,这个图的看法一般是从处理时间开始看,因为处理时间是系统真正的时间,然后从图中红线部分可以看到12:03:00
时,对应到12:02:10
左右。这表明在系统时间为12:03:00
时,12:02:10
以前的数据都已经读取完成,这时候计算12:02:10
之间的事件窗口,能包含所有的数据。
Dataflow统一编程模型
Dataflow编程模型将所有输入数据均视为无界数据集,仅将有界数据集视为无界数据集的一个特例。将数据集视为记录的集合,变成模型处理的输入数据存在以下特点:
- 无界(unbounded):由于记录在数据处理系统中是不断动态到达且永无止境的,因此输入数据是无界的。
- 延迟(delay):由于网络传输等原因,记录从产生到进入数据处理系统通常会产生延迟
- 乱序(out-of-order):由于产生的延迟不同,输入数据的顺序和原始数据的顺序不一定相同
针对这三个特点有什么相应的应对方法吗?
- 对于无界的特点,其实可以直接当成流计算进行处理,或者使用窗口操作的方式使其变成一个有界的数据再进行计算。
- 对于延迟的特点,通常是指事件时间和处理时间不统一,导致了对事件时间窗口的计算不友好。这种情况其实可以依靠水位线的方式进行处理(后面讲到的)。
- 对于乱序的问题,这个会使得顺序不统一,如果计算不要求顺序的话,则可以使用后面提到的结果修正对迟到数据进行处理。如果计算要求顺序时,这时候有一个很容易想到的解决方案,就是给每个流数据添加一个标识表示ID,对于连续的数据这个ID是递增的,如果计算系统中出现了间断(指系统中的数据ID不连续),则等待缺失的数据到达再进行计算。但这种方式无疑会增加系统的延迟。
Dataflow编程模型又时也称为What-Where-When-How(WWWH)模型,通过操作描述,窗口定义,触发器和结果修正4个方面可花针对无界、乱序输入数据的处理过程
- 操作描述:需要对输入数据执行什么(What)操作
- 窗口定义:在何处(Where)进行数据切分
- 触发器:系统在处理时间域应该何时(When)触发基于事件时间定义的窗口
- 结果修正:当输入数据无界时,由于输入数据的乱序,系统互发窗口计算后仍然可能有迟到的数据抵达,那么如何(How)修正已经触发的窗口的计算结果?
操作描述
Dataflow变成模型使用PCollection<KV<键类型,值类型>>
表示数据集,是一系列键值对的集合,提供预定义的一系列操作,其中最核心的是ParDo
、GroupByKey
操作。
ParDo
:该操作对每个键值对执行相同的处理,获得0个或多个输出键值对GroupByKey
:该操作用于将剪枝对按键进行重新分组
以数据求和为例子,对于有界数据,只需要等待所有的数据都读取完成后,再进行计算即可,可以使用下面的代码执行这个过程
1 | PCollection<KV<String,Integer>> output = input.apply(Sum.integersPerKey()); |
其中integersPerKey
是用户自定义的操作。最后这些数据的计算结果为51
窗口定义
针对无界数据集的操作通常需要先定义窗口,再确定的窗口中执行操作。还是以有界数据整数求和为例子,此时需要按照时长为2min的基于事件时间的固定窗口执行操作,可以用下面的代码表示
1 | PCollection<KV<String,Integer>> output = input |
按照下图的方式进行划分,最终每个窗口的计算结果为14、22、3、12
对于有界数据而言,确定数据全部到达的时间很容易,但是对于无界数据而言,通过窗口定义,可以仅针对窗口中的数据执行啊哦做而不必获取所有数据。但还是有一个问题:在输入为无界数据的情况下,应何时输出窗口中的计算结果?
触发器
触发器机制描述了何时将窗口的结果输出,这个可以定义在水位线到达窗口指定的事件时间戳时输出结果
1 | PCollection<KV<String,Integer>> output = input |
如下图,这就使得当水位线越过窗口指定的事件时间后出发结果的输出。
但水位线本质是对事件时间的一种猜测,其与真实事件时间相比可能过快或过慢。
如果水位线设置过慢,则根据水位线时间出发窗口计算很可能导致整个处理结果存在较高的延迟。因此,除水位线时间到达窗口指定的时间戳外,还可以设置触发器“提前”出发结果的输出。比如添加一个触发器,在处理时间域上每隔1分钟输出一次结果。
1 | PCollection<KV<String,Integer>> output = input |
于是求和结果会变为:
结果修正
如果水位线设置过快,则在水位线之后仍然有记录到达,为此,触发器提供了3种不同的方式以进行修正同一窗口的不同计算结果
- 抛弃(discarding)窗口内容:触发器一旦触发,窗口内容即被抛弃,之后窗口计算的结果和之前的结果不存在任何相关性
- 累积(accumulating)窗口内容:触发器触发后,窗口内容进行持久化,而新得到的结果称为对已输出结果的一个修正版本。
- 累积和撤回(accumulating & retracting)窗口内容:触发器触发后,不仅将窗口内永久保留,还需记录已经输出的结果。当窗口再次触发时,首先撤回已经输出的结果,然后输出新得到的结果。
还是使用之前的例子,这次每遇到一个迟到的记录时将修正已经输出的结果
1 | PCollection<KV<String,Integer>> output = input |
最终得到结果:
关系化Dataflow编程模型
Dataflow将输入数据看做一系列的键值对记录构成的数据,如果将这些键值对看成关系表,此时就不再需要提供对键值对进行转化操作,而是针对关系表的SQL。这种提供了SQL描述的Dataflow编程模型称为关系化的Dataflow编程模型。由于Dataflow认为输入的数据是无界的,因此这种输入的关系表示动态变化的,并且输出的关系表同样是动态变化的。
Dataflow编程模型在触发器定义的时刻针对关系表进行SQL操作得到新的关系表,再将新的关系表相应地转换为无界数据集。关系化的Dataflow编程模型将无界数据集中的记录看做元组,无界数据集记录向关系表的转换可以视为像关系表中不断添加新的元组。
一体化执行引擎
统一编程可以从两个层面理解:
- 多个不同执行引擎之间批处理与流计算编程模型进行统一
- 在同一个执行引擎中是按批处理与流计算的统一编程
当前通用的核心执行引擎为批处理引擎或流处理引擎,因此一体化执行引擎需要选择其中一种作为核心。
一体化执行引擎的实现或选择批处理引擎作为核心执行引擎并基于批出来来处理无界数据集,或选择流计算引擎为核心执行引擎并基于流计算来处理有界数据集。
以批处理为核心:这个在之前就已经提到过了,就是微批处理,将无界数据按照时间进行划分,然后形成一批批连续的,小型的有界数据,输入到批处理引擎中进行处理。但是会产生一个问题,就是新的短时应用必须在率先启动的短时应用执行完毕后才能启动。这种操作本质上是串行执行的,这种串行执行短时应用的方式带来的延迟通常在秒级。同时,这种思路不易于实现基于记录计数的窗口,会话窗口等操作,并且造成了较高的延迟。
以流计算为核心:这种方式下,就需要使用连续处理进行计算,一次性启动一个长时运行的应用,而非像微批处理模式中那样不断启动一系列短时应用。在该模式中,由于长时间运行的应用一旦读取到数据则立即进行处理,因此产生的延迟低于微批处理的延迟,这种系统产生的延迟通常在毫秒级。
批流融合系统Flink
设计思想
Flink系统的设计思想是以流计算为中心,将有界数据视为无界数据的特例。类似于Storm,Flink系统将需要处理的数据抽象为DataStream形式,并使用DAG描述计算过程。
数据模型
Flink将输入数据看作一个不间断的、无界的记录序列,一系列记录构成DataStream,并且DataStream中的记录是不可变的,一旦创建即无法在物理上改变。Flink使用DataSet类表示有界数据,其中的记录也是不可变的。Flink在记录集合的级别对数据进行抽象。
计算模型
Flink系统提供了丰富的操作算子对DataStream进行转化,其一系列转换操作构成一张向无环图(描述计算过程的DAG),操作算子可以分为
- 数据源(DataSource):描述DataStream数据的来源
- 转换(Transformation):描述DataStream在系统中的转换逻辑
- 数据池(DataSink):描述DataStream数据的走向(这个应该是在迭代算子那一部分用到了)
其逻辑计算模型也是由算子构成的DAG,在物理设计层面,利用分布式架构加快数据处理,每个操作算子实际上由若干实例任务(Task)实现。
迭代模型
MapReduce和Spark的迭代计算的过程由用户编写的外部驱动程序控制,而Flink系统将迭代作为内部算子嵌入到DAG中实现迭代计算。DAG是有向无环图,但是迭代计算必然存在环路,看似存在矛盾,但实际上,迭代计算作为一个算子嵌套在DAG中,对于DAG来说整个迭代过程就是一个算子,该迭代算子的内部实现存在反馈环路。如下图
批式迭代和流式迭代计算在语义上有所差异,批式迭代的输入数据是有界的,每轮迭代计算的全部结果作为下一轮迭代计算的输入。在流式迭代中,输入数据是无界的,通常每轮迭代计算的部分结果作为输出向后传递,而另一部分结果作为下一轮迭代计算的输入,并且迭代过程是无限的。之前提到了DataStream既可以表示有界数据也可以表示无界数据,但是由于二者在迭代计算上的语义差异,针对两者的迭代计算需要使用不同的算子。
体系架构
架构图
Flink的抽象架构图与Spark类似,根据是否使用Yarn资源管理系统进行系统部署,也可以分为Standalone和Yarn两种模式。与Spark不同的是,Flink目前没有使用驱动器进行作业管理,而是由JobManager负责,因此不存在Client和Cluster之分。Flink同样采用"主从"架构。
- Client:客户端,将用户编写的DataStream程序翻译为逻辑执行图并进行优化,并将优化后的逻辑执行图提交到JobManager。
- JobManager:作业管理器,根据逻辑执行图生成物理执行图,负责协调系统的作业执行,包括作业调度,协调检查点和故障恢复。(Standalone模式下的进程名为:StandaloneSessionClusterEntrypoint)
- TaskManager:任务管理器,用于执行JobManager分配的任务,并且负责读取数据、缓存数据以及其他TaskManger进行数据传输。(Standalong模式加的进程名为:TaskManagerRunner)
系统 | MapReduce | Storm | Spark | Flink |
---|---|---|---|---|
系统进程 | JobTracker | Nimbus | Master | StandaloneSessionClusterEntrypoint |
TaskTracker | Supervisor | Worker | TaskManagerRunner | |
Child | Worker | CoarseGrainedExecutorBackend | ||
工作线程 | — | Executor | Task | Task |
任务代码 | Task | Task | ||
基础接口 | Map/Reduce | Spout/Bolt | RDD API | DataStream(DataSet) API |
Flink的Yarn的部署方式和其他框架也差不多,就是使用Yarn的相关进程替换JobManager和TaskManager,具体对比如下:
模式 | Standalone | Yarn |
---|---|---|
资源管理 | StandaloneSessionClusterEntrypoint、TaskManagerRunner | ResourceManager |
NodeManager | ||
作业管理 | YarnJobClusterEntrypoint | |
任务执行 | TaskManagerRunner | YarnTaskExecutorRunner |
应用程序执行流程
Standalone模式的Flink执行流程如下,客户端在提交Flink应用程序时,可以选择Attached(客户端与JobManger保持连接,可以获取关于应用程序执行的信息),也可以选择Detached(客户端与JobManger断开连接,无法获取关于应用程序执行的信息)
为什么要区分Attached和Detached呢?我认为Attache模式下可以获取应用程序的执行信息,因此可以用在开发前期用于测试环境中,判断程序是否能够正常执行。经过完善的测试后,就可以确保程序在大部分的情况下都能处于稳定的运行状态中,因此在部署时就可以采用Detached的方式。
- 客户端将用户编写的程序进行解析,并将解析后的作业描述交给StandaloneSessionClustingEntrypoint。
- StandaloneSessionClustingEntrypoint根据作业描述进行任务分解,确定各个TaskManager负责执行的任务
- TaskManager执行各自的任务
Yarn模式的Flink执行流程如下,同样可以选择Attached和Detached的方式进行提交:
-
用户在客户端提交Flink作业,客户端启动ChiFrontend进程,CliFrontend将用户编写的程序进行解析,并将运行Flink系统的jar包以及配置文件上传至HDFS。
-
CliFrontend进程向Resource发起请求,申请启动YarnJobClusterEntrypoint(ApplicationMaster),ResourceManager确定启动YarnJobClusterEntrypoint的节点。
-
需启动YarnJobClusterEntrypoint进程的节点中的NodeManager将HDFS中的jar包与配置文件下载至该节点
-
NodeManager启动YarnJobClusterEntrypoint进程
-
CliFrontend进程将解析后的作业描述交给YarnJobClusterEntrypoint
-
YarnJobClusterEntrypoint向ResourceManager注册,此时客户端可以通过ResourceManager查看Flink应用程序的资源使用情况。YarnJobClusterEntrypoint根据作业描述进行任务分解,并向ResourceManager申请启动这些任务的资源
-
ResouceManger以Container形式想提出申请的YarnJobClusterEntrypoint分配资源。一旦YarnSessionClusterEntrypoint申请到资源,即在多个任务之间进行资源分配
-
YarnJobClusterEntrypoint确定资源分配方案后,与对应的NodeManager通信
-
如果该NodeManager所在节点尚未下载,则将HDFS中的jar包与配置文件下载至本地,并在相应的Container中启动相应的YarnTaskExecutorRunner进程以执行任务。
-
各个任务向YarnJobClusterEntrypoint汇报进度和状态,一边令YarnJobClusterEntrypoint随时掌握各个任务的运行状态
-
随着部分任务执行结束,YarnJobClusterEntrypoint逐步释放占用的资源,最终向ResourceManager注销并自行关闭
Yarn的执行方式看着虽然比较复杂,但是其实还是遵循了资源管理和作业管理分离的思想,这样的方式可以使得不同的系统在同一个集群中互不干扰彼此任务执行的条件下,充分利用集群各种资源。
工作原理
在Flink应用程序的执行过程中,Client根据DataStream程序生成逻辑执行图并进行优化,之后将优化后的逻辑执行图提交给JobManager。JobManager获得逻辑执行图后生成物理执行图,从而分配给TaskManager执行。TaskManager启动Task线程执行JobManager分配的任务。由于Flink系统内置了迭代算子,在TaskManager中的任务可以分为实现迭代算子的迭代任务和其他非迭代任务。
逻辑执行图的生成与优化
给定用户编写的DataStream程序,Flink的Client将其解析生成如上图所示的逻辑执行图(DAG)。Client对各个算子的依赖关系进一步分析,借助Spark中RDD依赖关系的宽依赖和窄依赖的概念,如果算子之间的数据依赖为窄依赖关系,则算子之间呈现一对一的数据传递关系。因为算子处于不同TaskManager的任务实现会带来TaskManager之间的数据传输开销,Flink使用Chaining机制进行优化,将部分算子合并成一个“大”算子。Chaining优化并不改变算子的语义,但是可以避免数据在不同TaskManager之间的非必要传输。
物理执行图的生成与任务配置
JobManager在收到Client提交的逻辑执行图后,根据算子的并行度,将逻辑执行图转换为物理执行图。物理执行图中的一个节点对应一个任务,分配给TaskManager执行。
JobManager在生成物理制图后,将各个算子的任务分配给TaskManager。原则是:根据任务槽(TaskSolt)的容量,尽可能将存储与数据传输关系的算子实例放在同一个任务槽中,以保持数据传输的本地性。
非迭代任务间的数据传输
Shuffle是一种阻塞式的数据传输方式,位于上游的任务必须等到所有记录均计算结束后才可向下游任务传递数据。(MapReduce,Spark)
消息传递机制是一种非阻塞式的数据传输方式,位于上游的任务处理一条记录后则立即向下游任务传递数据。(Storm)
如果实现非迭代算子的任务位于不同的TaskManager,则**Flink将采用流水线机制(非阻塞式)进行数据传输。流水线机制一次传输一个缓冲区(buffer),该缓冲区中通常存储不止一条记录。**假设每个TaskManager仅包含1个任务槽并且任务槽的容量为2,TaskManager设置固定大小的缓冲区,一旦缓冲区满或者达到阈值,则向负责接收数据的TaskManager发送数据。
迭代任务间的数据传输
在Flink系统中,利用了迭代前端(Iteration Source)和迭代末端(Iteration Sink)两类特殊的任务实现数据反馈,两类任务成对处于同一个TaskManager,迭代末端任务的输出可以再次作为迭代前端任务的输入。
在流式迭代计算中,通常每轮迭代的部分计算结果作为输出,传递给后续的算子,而另一部分作为下一轮迭代计算的输入。
在批式迭代计算中,通常需要将上一轮迭代结算的全部结果作为输入(要取得所有的计算结果,通常会存在一定的阻塞情况),直到满足迭代的结束条件。当然满足结束条件的时候,迭代前端会发出特殊出的控制事件,表示迭代计算结束。
Dataflow编程模型的实现
Dataflow统一编程模型4个方面:操作描述,窗口定义,触发器和结果修正。Flink中实现算子的任务必须维护自身的水位线,位于下游的任务根据上游任务发出的水位线不断更新自身的水位线,并向后传递(如果一个算子有多个输入,则需要等待所有输入的水位线都传递到该算子之后,算子再使用这些输入中时间最早的水位线作为当前算子的水位线)。系统一般提供预定义的水位线计算方式,同时用户也可以自定义DataSource算子,并实现其水位线计算方式。
窗口本质上也是一种算子,并且在特定时刻触发计算。从系统实现的角度看,窗口定义、触发器和结果修正是绑定在一起的。Flink中的窗口操作由分配器(window assigner)、窗口实例(window instance)、窗口函数(window function)构成,其中窗口实例均设有相应的触发器。
如下图:输入数据的形式为数字,事件时间
或水位线,事件时间
,在9:01
时,窗口分配器收到3,8:50
的数据,将其放入到[8:50,9:00]
的窗口实例中。接着9:02
时,窗口分配器收到4,8:58
这个数据,符合两个窗口实例的要求,因此放入到两个窗口实例中。在下来9:03
时,窗口分配器收到|,9:00
的水位线,然后发送到窗口实例1,并进行水位线的更新,此时水位线达到窗口的结束位置,窗口开始计算,于是得到结果7。
除了用于触发窗口计算的触发器外,用户还可以自定义其他触发器,用于提前输出结果。对于迟到的数据Flink也需要根据用户选择的结果修正方式修正窗口计算的结果。Flink在一定超时范围内使用累积模式对迟到的记录进行结果修正,一旦超时则忽略迟到的记录。基于事件时间的窗口需要利用水位线触发,而基于处理时间的窗口仅需使用系统时间触发。
关系化Dataflow编程模型的实现
Flink系统可将输入的一系列记录转换为关系表,也可以将关系表转换为一系列记录,关系表是动态变化的,因此Flink中称其为动态表。Flink提供了Table和SQL两个支持关系型运算的编程接口,用以支持关系化的Dataflow编程模型。
Flink利用Apache Calcite进行SQL解析,并根据编目(catalog)进行校验,生成Calcite执行逻辑,然后Calcite基于常用的优化规则(算子下推,剪枝等关系查询优化规则),以及Flink提供的针对DataStream的优化规则对逻辑执行计划进行优化,从而生成面向DataStream的物理执行计划,依据物理执行计划,Flink通过代码生成方式产生可执行的DataStream程序。
容错机制
- 如果仅Clint发生故障,则只要作业成功提交给系统即不会影响系统中作业的运行
- JobManager故障,则需要重新启动或借助ZooKeeper实现高可用性
- TaskManager故障,则可以重启TaskManager或者将本由这些TaskManager负责执行的任务交给新的TaskManager。
状态管理
和之前提到的概念相同,窗口中需要保留的内容即为状态,其可以保留已处理记录的结果,并对后续记录的处理造成影响。如果状态使用进程进行管理,当进程发生故障时,状态也会一并丢失,因此状态应该由系统进行管理。Flink系统提供了特殊的数据结构(状态,State),用于保存操作算子的计算结果。提供了以下3种存储算子的状态MemoryStateBackend、FsStateBackend和RockDBStateBackend,前两者将算子的状态存储在TaskManager的内存中,而后者将算子的状态存储在TaskManager内置的RockDB数据库中。
状态存储范式 | 正常运行时 | 写入检查点 |
---|---|---|
MemoryStateBackend | 本地内存 | JobManager内存 |
FsStateBackend | 本地内存 | HDFS |
RockDBStateBackend | 本地RocksDB | HDFS |
非迭代计算过程的容错
在某一时刻,系统处理的记录可以分为3种类型:
- 已经处理完毕的记录
- 正在处理的记录
- 尚未处理的记录
虽然绝对同步的时钟不存在,但是同一时刻保存所有算子状态到检查点的目的是区分第一种情况和后两种情况。Flink借鉴分布式系统中用于保存系统状态的Chandy-Lamport算法的思想,实现异步屏障快照(ABS)算法,所保存的快照即为检查点。异步屏障快照算法通过在输入数据中注入屏障并异步地保存快照,达到和在同一时刻保存所有算子状态到检查点相同的目的。
异步屏障快照算法会在输入数据中间断地插入屏障,并随着记录一起向下游任务流动,每个屏障指示对应的检查点ID。算子在收到屏障后,便将算子当前的状态写入外部可靠的存储系统中。需要注意的时候,在写入时要进行屏障对齐,屏障对齐的含义就是如果一个算子的数据源有多个的话,该算子如果只接收到1个算子的屏障是不会进行状态写入检查点的,而是等待该算子的所有数据源的相同屏障到达后,才将状态写入检查点。
当发生故障时,Flink选择最近完整的检查点n将系统中每个算子的状态重置为检查点中保存的状态,并从数据源中重新读取属于屏障n之后的记录。当然,该过程要求数据源具备一定的记忆功能。通过这种容错机制,Flink能够满足准确一次的容错语义。
迭代计算过程的容错
迭代反馈的数据和输入数据将继续进行新的计算,因而在该情况下仅靠屏障无法将属于检查点n和检查点n+1的记录区分。根据Chandy-Lamport算法,反馈环路中的所有记录需要以日志形式保存,当故障发生后,系统需要根据最忌你的完整的检查点n重置各个算子的状态,还需要重新读取属于屏障n之后的记录以及日志中的记录。