这种框的部分一般是自己衍生出来的疑问与解答(当然有的问题可能暂时没有解答)

(ps:本文只是粗略记录,想了解具体细节还是建议要看原书)

分布式系统:若干独立计算机的集合,这些计算机对于用户来说就像是一个单机系统。

分布式系统{分布式计算系统——利用分布式技术支持高性能计算和大规模数据处理分布式信息系统——利用分布式系统技术构建信息管理分布式系统\begin{cases} 分布式计算系统——利用分布式技术支持高性能计算和大规模数据处理\\ 分布式信息系统——利用分布式系统技术构建信息管理 \end{cases}

大数据的5个特征:

  1. 数量(volume):数据量大
  2. 种类(variety):数据类型多
  3. 速度(velocity):数据生产速度快
  4. 价值(value):数据中存在很多的潜在价值
  5. 质量(veracity):自动采集的数据通常质量比较低

数据库系统的主要功能模块:

  1. 数据存储
  2. 数据组织模型与访问
  3. 查询处理
  4. 事务管理

计算密集型应用:计算机在处理时,CPU的处理能力成为了首要限制性因素

数据密集型应用:计算机在处理时,I/O带宽成为了首要限制因素

数据处理系统:

  1. 批处理系统:处理静态数据,对吞吐量要求较高实时性较低
  2. 流计算系统:处理动态数据,对实时性要求较高吞吐量要求较低
  3. 批处理融合系统:同一场景不同模块的数据特征和计算需求不同的场景

Hadoop文件系统

现有问题:

  1. 如何存储上百GB/TB级别的大文件?
  2. 如何保证文件系统的容错?
  3. 如何进行大文件的并发读写控制?

HDFS在解决上述问题时使用的思想:

  1. 文件分块存储:HDFS将大文件切分成块(128MB或其他大小),这些文件块可以分布到集群中不同的节点上。

    为什么这么做?如果是1TB级别的大文件,不按照分块的方式存储,那么就意味着这个文件就直接占满了1个1TB的硬盘,如果比他小的硬盘甚至无法存储。

    这样做有什么好处?分块之后,可以将不同的块存储在不同大小的磁盘中,这些磁盘可以分布在其他机器上。虽然总容量还是1TB的大文件,但是可以存储在不同的机器上,减小了对硬盘容量的要求。

    这样做有什么缺点?先说如果全部塞到一个硬盘中有什么优点,全塞到一个硬盘时,对这个文件的读取和写入的IO过程都是在同一个磁盘上,读取速度相对较快并且可以指定任意读取部分。而采用分块的读取方式后,则在进行读取和写入的IO时是在不同的磁盘上进行的,会产生消息传递的一个延迟,同时每次都只能读取一个完整的块。同时一个文件如果是129MB,块的大小是128MB,那么多出来的这1MB,也要占用一个新的块。

    分块是怎么做的?其实就是针对文件的每128MB取一个切片,形成一个块进行存储,并给这个块赋予一个唯一的编号,方便在对文件进行读写操作时,知道从哪一块中获取数据。

  2. 分块冗余存储:将大文件分成块,每个小块同时进行冗余备份。

    为什么这么做?备份的目的就是为了防止某个块损坏时,文件无法正常读取,使用冗余备份,可以在某个块坏掉时,使用备份块进行替换。

    有什么好处?解决了在某个块损坏导致文件无法读取的问题。

    有什么缺点?需要占用更多的存储空间。

  3. 简化文件读写:一次写入后不再修改而是多次读取。

    为什么这么做?简化操作,HDFS中一旦文件写入完成之后就不能再被修改,只能执行添加操作。因为文件使用块的方式进行存储,要找到修改的位置在哪个块中的哪个块中是十分费时费力的。

    优点:操作简化,只有创建,添加,删除。

    缺点:要执行修改操作时需要重新写入新文件并删除旧文件,执行时间过长。

体系架构

HDFS架构

  1. NameNode:负责HDFS的管理工作,文件目录结构,位置等元数据,维护DataNode状态,并不实际存储文件。
  2. Secondary NameNode:NameNode的备份,当NameNode发生故障时利用Secondary NameNode进行恢复。
  3. DataNode:负责存储文件,根据NameNode的控制信息存储和管理对应的文件块,并定期向NameNode汇报自身的状态。

为什么使用这样的架构?

首先客户端是用于启动任务的地方,这个肯定是需要的。文件系统中需要一个结构来存储各个文件的存储的路径(通常以树的形式),因此NameNode就用于管理文件目录结构,和位置等元数据。然后各个文件的实际数据又需要存储,同时这些文件的总体积非常大,又考虑到分布式系统应该具备容错能力,要产生很多备份。于是根据主从结构的思想,文件目录做主,用于对文件系统整体控制,文件实体数据做从,实际存储文件数据。因此使用DataNode作为从节点。这样一个完整的主从结构就构成了。又由于主从节点中的主只有一个,宕机了就很恐怖,于是使用SecondaryNameNode,用于备份NameNode数据。

当然如果NameNode和SecondaryNameNode都宕机了就比较麻烦了,得全部重启了,不过也可以使用ZooKeepper协助实现高可用性。

NameNode主要维护FsImage和EditLog两个文件,FsImage中记录了HDFS的树形目录结构(树的非叶子节点是路径,叶子节点是对应的块存储位置,而不是存储文件),EditLog中记录目录及文件修改操作。FsImage和EditLog都是写入到磁盘中的,只是在运行HDFS的过程中,FsImage会加载到内存中,方便使用。

01.2FsImage结构

SecondaryNameNode通常在与NameNode不同的物理计算机上运行,定期备份FsImage和EditLog两个文件。具体工作流程如下:

01.3SecondaryNameNode工作流程

应用程序的启动流程:

  1. 客户端向NameNode发起文件操作请求,常见的发起请求的方式为编写Java程序或使用HDFS Shell等。

  2. 如果是读写文件操作,则NameNode告知客户端文件块存储的位置信息。如果是创建,删除,重命名目标或文件等操作,NameNode成功修改文件目录结构即结束该操作(删除不会马上在DataNode上删除,而是等到特定的时间才会真正删除)。

    为什么不会马上删除呢?因为HDFS中的文件是以块为单位进行存储的,可能有多个块,同时每个块还有对应的冗余备份,在执行命令后让NameNode直接删除DataNode的内容会进行多线程的删除操作,这样会对NameNode产生较大的负担,同时影响其他多线程任务的进行。因此只是删除记录,而不是马上在DataNode中删除。

    那在什么时候删除呢?在执行删除的命令后,会在EditLog中进行记录,并且会在NameNode中移除对应的文件信息。DataNode会定期向NameNode发送心跳信息(DataNode中存储节点的情况),这时候NameNode会反向索引这些块是否还存在文件树中,如果不存在则通过这次心跳反馈给DataNode,这时候DataNode的文件块才会执行真正的删除操作。

  3. 对于读写文件操作,客户端获知具体位置信息后再与DataNode进行读写交互。

工作原理

文件分块与备份

分块就是对一个大型文件按照块的大小进行划分成若干块,划分出来的块会作为HDFS的最小读取单位,在读取时,会完整地对块进行读取,不会因为只有块的一部分是目标而只读一部分。同时在分块时,如果默认分块的大小是128MB,有一个文件是129MB,那么多出来的这1MB,也要占用一个新的块。

一般来说,HDFS中的每个文件块均设有3个副本:

  1. 第一个副本:如果客户端和某一DataNode位于同一物理节点,那么HDFS将第1个副本放置在该DataNode;如果客户端不与任何DataNode位于同一物理节点,那么HDFS随机挑选一台磁盘容量与CPU性能俱佳的节点。

    为什么这样设置第一个副本?目的是实现快速写入,如果要写入的目标和写入请求在同一个节点上,那么写入的过程就不会产生太大的数据传输延迟。

    那为什么要选容量与CPU性能俱佳的节点呢?因为如果无法写在同一个DataNode中,那肯定都会产生传输延迟,既然如此就要将这个延迟降低。如果写入时磁盘空间不够,那写入磁盘的时间被浪费掉,同时还要找新的节点。CPU计算得越快,写入的速度也越快,延迟就会下降地更低。

  2. 第2个副本:NameNode将第2个副本防止于不同于第1个副本所在的机架的某一个节点中(磁盘容量与CPU性能也要俱佳)。

    为什么要这样做?有一种可能,就是现在要访问的文件块所在的DataNode,或者存放这个文件块的物理计算机宕机了,那么就需要找下一个副本了,如果第2个副本还在这个物理计算机上,那第2个副本也没有用了,所以,通常是放在与第1个副本不同的机架中。如果DataNode没宕机,那何必用第二个副本呢?直接用第1个副本不就行了么。此外,假设第一个副本放在机架1中,第二个副本放在机架2上,如果在机架2中的某个节点想访问这个数据块,那么直接从第二个副本中读取,就能减少机架间的数据传输延迟。

  3. 第3个副本:NameNode将第3个副本放置在第一个副本所在机架的不同节点。

    为什么要这么做?如果第一个副本宕机了,而位于与第一个副本相同机架的某个节点发起了读取请求,无法直接访问第一个副本了,那就可以快速地直接访问第3个副本。同时如果这时候机架间通讯也坏掉了,这时候第三个副本的作用也更大了(因为机架间通讯坏掉的话,拿到第二个副本就不可能了)

  4. 如果还有其他副本,就选一台磁盘容量与CPU性能俱佳的节点直接进行存储就行。

文件写入

  1. 首先是将文件进行分块,然后客户端与NameNode进行通信,请求写入一个文件块。
  2. NameNode根据启发式的策略决定文件块放置的DataNode,并将DataNode的位置告知客户端。
  3. 客户端与DataNode1建立连接,DataNode2与DataNode2建立连接,DataNode2与DataNode3建立连接,客户端将文件块以流水线的方式写入这些DataNode(客户端不是一次性发送完整个文件,而是以更小的数据包为单位进行传输,数据包写入本地缓存后,立刻把这个数据包转发给下一个节点)。
  4. DataNode3写完毕后向DataNode2发送确认信息,随后DataNode2向DataNode1发送确认消息,最后,DataNode1向客户端发送确认消息,表示该文件块成功写入。但是并不意味着文件的写入已经完成了,如果文件还有其他的文件块,则会继续按照上面的步骤按顺序进行发送。

image-20221009181703451

这样的写入方式有什么优点?能够写入到多个廉价的机器上,具备比较高的容错性。同时按照流水线的写入方式可以确保每个文件块及其副本都已经准确写入到对应的DataNode上。通过消息回馈也能让服务端知道什么时候文件传输完成。

这样的写入有什么缺点?为了保证数据文件的一致性,牺牲了执行时间,按照这种流水线的写入方式,让整个写入过程都是串行进行的,写入比较缓慢。同时一个文件不同块的写入过程也是串行的,这就导致速度更慢了。因为是按块写入,如果小文件比较多的话,这个空间利用率有点低。

question:一个文件块及其副本传输是串行的我能理解,但为啥一个文件的不同文件块不能并发写入呢?如果是为了避免两个块刚好写到一半,磁盘就满了这个情况,那不能提前在服务端预估一下要占用的空间,从而进行一个初步判断是否能写入就解决了么?

文件读取

  1. 客户端与NameNode通信,请求读取一个文件
  2. NameNode根据文件的路径等信息判断读取请求是否合法,如果合法则向客户端返回文件中所有数据块的存放地址。
  3. 对于第1个数据块,客户端从距离最近的存放该数据块的DataNode读取数据。
  4. 当第1个数据块读取完毕后,客户端从距离最近的存放第2个数据块的DataNode读取数据
  5. 以此类推客户端读取下一个数据块,直到读取完所有的数据块。

image-20221009184546161

question:按照上面的读取描述,每一个块的读取过程都是串行的,还是老问题,为什么不能并发读取每一个块,然后异步整合成每一个文件呢?总不能说这样设计太难了,所以没有这么干吧。

文件读写一致性

HDFS中约定:同一时刻仅能发生一个写文件的请求,文件写入后即不可更改,并且文件写入后的读取操作可以并发执行。

HDFS采用一次写入、多次读取的简化一致性模型

  1. 一个文件经过创建、写入和关闭后即不得改变文件中的已有内容
  2. 若已经写入HDFS文件,则仅允许在文件末尾追加数据
  3. 当对一个文件执行写入操作或者追加操作时,NameNode将拒绝其他针对该文件的读、写请求
  4. 当对一个文件执行读取操作时,NameNode允许其他针对该文件的读请求。

为什么这么做?这样简化了文件的操作,追加好操作,直接从最后一个块的末尾进行写入或者写入一个新的块就好了。但是修改还得读取整个文件,需要拉取所有的文件块,修改里面内容后,还得移动每一个文件块中的存储细节,明显很复杂,也很费时。同时文件在写入时,其他用户也可以写入,则不好判断哪一个是正确的写入。同样的,如果文件在写入时,其他用户可以读取,那么他读取到内容可能不是最新写入了。这两种情况都导致了文件读写不一致。但如果都是读请求,都只需要拉取所有的文件块,而不需要修改其内容,因此读请求可以一起执行。

那这样的方式有什么缺点吗?缺点肯定是有的,最大的缺点就是不能修改,要是想修改就必须先写入新的文件,然后将旧文件删除,这样无疑增大了修改时间。

容错机制

NameNode故障

NameNode故障之后通常由SecondaryNameNode进行恢复,在之间讲述了SecondaryNameNode的工作方式,所以NameNode在故障之后,SecondaryNameNode中存储了NameNode中的信息,所以就可以直接重启NameNode并通过SecondaryNameNode中的备份信息对NameNode的结果进行恢复。

需要注意的是,如果在SecondaryNameNode备份过程中,有用户对文件进行了操作,EditLog.new就不再为空,在备份完成后,NameNode故障了,则EditLog.new中的内容是不会存在SecondaryNameNode中。因此这种容错机制是"冷备份"而非"热备份"。在Haddoop 2.x的版本中,增加了高可用性的支持,其中配置了活动-备用NameNode,其可以接管故障的NameNode任务。

DataNode故障

DataNode会定期向NameNode发送心跳以表示节点处于活跃状态。如果NameNode在一定时间范围内没有收到DataNode发送的心跳,那么NameNode就认为DataNode宕机了,标记DataNode为不可用状态。当客户端访问文件涉及到不可用的DataNode时,就不会使用这个宕机的DataNode中的数据,而是使用其他DataNode中的文件块。

同时,DataNode宕机后会导致一部分的文件块冗余备份没有达到系统的要求,因此会使用之前提到的启发式策略对文件块进行冗余备份。

question:因为NameNode和DataNode可能不在同一台物理设备上,因此心跳的传输可能会通过网络传输等方式进行,如果心跳在发往NameNode的途中出错了,导致心跳信号无法到达NameNode。那么,本来DataNode没有故障,但NameNode却认为其故障了,这时候应该怎么办呢?(回答,心跳基于RPC实现的,默认是可靠传输,除非真的DataNode挂了或者断网的,不然一般心跳信号都是可以传输过去的)

我感觉,这个应该好解决,就是NameNode先标记为不可用,然后等到DataNode进行下一次心跳的时候,如果接收到了,就在NameNode中,将对应的DataNode的不可用状态标记为可用。

但是这时候还会产生一个问题,就是不可用的时候会触发冗余备份,那么在等到好的DataNode第二次心跳带来前,冗余备份就完成了或者在进行中,这时候本来需要荣誉备份的变成不需要了,这时候会不会浪费掉存储空间或一部分计算资源呢?

还有没有一种可能,NameNode标记为不可用之后,就给DataNode发送重启信号,重启之后他存储的东西就全部丢掉。

批处理系统MapReduce

设计思想

MPI是在MapReduce之前更早产生的分布式计算系统,其为程序员提供了一个并行环境库(基于一种消息传递接口),程序员可以通过调用MPI提供的程序接口达到并行处理的目的。其存在如下缺点:

  1. 从用户编程来看,程序员需要考虑进程之间的并行问题,进程之间的通信需要用户在程序中进行显式表达。
  2. 采用多进程的方式进行,当某一进程崩溃,如果之前没有添加故障恢复的功能,否则MPI并不提供容错功能。

MPI不提供容错功能,将会导致计算资源的浪费,分布式计算系统应该具备容错能力,这就是MapReduce与MPI的重要区别。

数据模型

数据在MapReduce流动的过程,通常是以键值对的形式,按照不同的键值,实现对一些列数值的处理。MapReduce是分布式计算系统的一个框架,其数据来源通常也是分布式存储系统(比如HDFS)。基本数据流动过程如下图所示:

image-20221010095836173

计算模型

MapReduce将复杂的、运行大规模分布式集群中的并行计算过程高度抽象为Map和Reduce两个过程,采用分治的策略对数据进行并行处理。和上图一样,Map过程将输入键值对进行一次转化,产生若干新的键值对,Map转化后的键值对的内容是不同的。MapReduce框架将Map转化得到的键值对按照键值进行分组,即shuffle,从而狙击拥有不同键的键值对。之后,Reduce过程相对相同键的键值对进行计算,并根据需要将计算结果进行一次键值对转化后输出。

MapReduce无须关系分布式系统中节点间的通信方式,可以像编写集中式程序那样编写MapReduce代码。此外,系统可以并行启动一系列进程以执行Map和Reduce操作,一旦出现故障,MapReduce框架可以自动进行容错处理,无须用户参与。

体系结构

架构图

MapReduce采用"主从"架构,如下图所示:

02.1MapReduce架构图

  1. JobTracker:主节点运行的后台进程,负责整个系统的资源管理和作业管理。监控TaskTracker管理系统拥有的计算资源,作业管理是指JubTracker负责将作业(Job)拆分成任务(Task),并进行任务调度以及跟踪任务的运行进度。
  2. TaskTracker:从节点运行的后台进程,负责管理本节点的资源、执行JobTracker的命令并汇报情况。TaskTracker使用slot等量划分本节点中的计算资源,接收JobTracker发送的命令并执行,通过心跳将本节点的资源使用情况和任务运行进度汇报给JobTracker
  3. Task:从节点在应用程勋执行过程中启动的进程,负责任务执行。JobTracker根据TaskTracker汇报的信息进行调度,命令存在空闲slot的TaskTracker启动Task进程执行Map或Reduce任务,即MapTask或ReduceTask。在hadoop的MapReduce中,这些进程的名称为Child
  4. 客户端:提交应用程序启动的进程,负责将用户编写的MapReduce程序提交给JobTracker。在hadoop的MapReduce中,这些进程的名称为RunJar

为了避免通过网络读写输入输出数据的高额代价,MapReduce和HDFS通常部署于同一组物理节点,而TaskTracker和DataNode部署在同一个物理节点。JobTracker和NameNode部署在同一个物理节点,TaskTracker和DataNode部署在同一个物理节点。因此,MapReduce可以选择输入数据的DataNode所在的节点启动MapTask,从而节省了远程读取数据的网络开销。同理ReduceTask也可以用相同的方式,节省远程写入数据的网络开销。

image-20221010105203115

应用程序执行流程

image-20221010114721970

  1. 客户端编写MapReduce作业的配置信息、jar包等信息上传到共享文件系统,通常为HDFS。
  2. 客户端提交作业给JobTracker,并告知JobTracker作业信息的位置。
  3. JobTracker读取作业的信息,生成一系列Map和Reduce任务,并调度给拥有空闲slot的TaskTracker
  4. TaskTracker根据JobTracker的指令启动Child(即MapTask)进程执行Map任务,Map任务从共享文件系统读取输入数据
  5. JobTracker从TaskTracker处获得Map任务进度信息。
  6. Map任务完成后,JobTracker将Reduce任务分配给TaskTracker
  7. TaskTracker根据JobTracker的指令启动Child(即ReduceTask)进程执行Reduce任务,Reduce任务从Map任务所在节点的本地磁盘中拉取Map的输出结果。
  8. JobTracker从TaskTracker处获得Reduce任务进度信息
  9. 当Reduce任务运行结束并将结果写入共享文件系统时,整个作业执行完毕。

工作原理

MapReduce的工作原理就是从分布式文件系统中读取输入MapReduce任务的数据,经过Map解析成目标处理键值对,然后使用shuffle传输给Reduce任务,键相同的数据发送给相同的Reduce任务,然后Reduce通过这些键值对进行计算,并将结果以新的键值对形式写入分布式文件系统中。(与MPI的本质区别在于shuffle的过程(数据交换)不用显示定义)

image-20221010142456446

同时Map和Map,Reduce和Reduce任务之间是不会进行通信的,只有map和reduce任务之间才有通信。

数据输入

数据输入是指从存储系统中待读取的文件到Map任务可处理的键值对之间的映射。其与输入文件的格式有关,因此需要根据输入文件的格式定义到键值对映射。比如文本文件就可以将每一行在文件中的偏移量和行的内容作为一个键值对。同时HDFS系统是以块为单位进行存储的,所以直接就会想到,把每一块都作为一个Map任务的文件输入,从而完成快速读入的过程。但是这样存在一个问题,有可能有些键值对键的部分位于一个块的最末尾,值的部分位于另一个块的最开始,这就导致了文件读取不完整。

为了解决这个文件读取不完整的问题,MapReduce在实际使用过程中是按照HDFS的方式读取文件,然后再对文件中的内容进行分片(split),每一个分片中都包含着完整记录,这样就能在逻辑上将输入文件完整地划分给不同的Map任务。

Map阶段

Map过程就是将其转化为一个或多个键值对,即将[K1,V1]键值对转化为List([K2,V2])。具体过程如下:

image-20221010145732149

在Map方法之中,就是用户自定义的代码处理原始文件,然后将其映射为[K2,V2]键值对,接着使用partition方式(不如HashPartitioner)确定[K2,V2]的分区,最终得到[[K2,V2],partitionID],并将其写入到缓冲区中。当缓冲区内的数据达到一定的阈值,则先对这部分的缓冲区的数据进行固定(此时缓冲区还是剩余的部分可以继续支持Map写入,只是后面说到的操作,不包括这个部分,这部分用于下一次达到缓冲区阈值),然后按照partitionID进行排序,再将同一个分区内的键值按照键值进行排序,然后将排序好的结果溢写到磁盘形成文件。当这些溢写的文件达到一定数量,就对这些文件进一步归并(merge)形成一个新的文件,使得属于同一个分区的所有键值对连续存储。最终Map任务输出一个以大文件形式存储于Map任务所在节点的本地磁盘中。

优点:因为是并发运行多个Map任务,所以由多个进程执行,所以处理起来是相对快速的

缺点:美中不足是要将其溢写成文件,可能涉及到不同节点的IO,会影响一点效率,但是其解决了对缓冲空间要求较大的一个问题。毕竟磁盘空间的获取肯定比缓冲空间好获取的。

shuffle阶段

shuffle过程将键相同的键值对发送给同一个Reduce任务,将List([K2,V2])转化成[K2,List(V2)]。本质上就是K2是map中的key,相同的key的value值组成一个list。

Map任务完成后会通知所在节点的TaskTracker,并向其告知Map输出文件所在位置,之后TaskTracker进一步通知JobTracker。当Map任务完成率达到一定阈值时,系统将启动Reduce任务,启动后的Reduce任务会定期从JobTracker获取各个节点已经完成的Map任务的输出位置。Reduce任务拉取的数据一定是Map任务完成之后的数据(因此已经保存在磁盘中的文件,这一点是非常重要的)

image-20221010160822005

优点:将key中的不同value组合在一起了,在进行Reduce操作的时候,就能分组进行处理,达到一个有序的效果。

缺点:还是磁盘IO的问题,花费时间优点大,因为要用Map的文件中进行读取,然后还要对内容按照key进行组合。shuffle是阻塞式的数据传输方式,需要等待Map任务写入磁盘后才能开始。

为什么要用shuffle?在map中得到的是很多个元组键值对,其缺点是相同的key有很多的值,如果直接运用到Reduce中,一个是key太多,每个key对应的结果可能又不一样,同时也达不到我们的需求。shuffle就可以将key相同的value值进行一个整合,key对应不同值的列表,这样在shuffle过程中就很方便地对这个value list进行处理,处理结果也是一个key对应一个结果。

Reduce阶段

Reduce过程就是将其转化为一个或多个键值对。即将[K2,List(V2)]键值对转化为[K3,V3]

image-20221010160808648

question:我感觉这一部分不太对劲,描述的时候如下

map:[K1,V1]->List([K2,V2])

shuffle:List([K2,V2])->[K2,List(V2)]

reduce:[K2,List(V2)]->[K3,V3]

但是从流程图来看,map阶段确实是形成了List([K2,V2])的形式,但是在shuffle阶段我就觉得比较奇怪了,shuffle过程应该是对Map结果再进行一个整合,形成[K2,List(V2)]的形式,但是在流程图中shuffle只是将分区拆分成了两份(图中例子对应两个Reduce任务),然后在Reduce的过程中,的输入形式并不直接是[K2,List(V2)]的形式,而是之前分区的结果,然后还要对分区后的内容进行排序溢写。我感觉这个部分和Map流程图中最后那部分非常像,但是Reduce任务中获得输入后没有直接执行reduce中的方法,而是再次执行和map相似的任务,这一点我不是很能理解。

解答:

我在问题中说的三个任务运行描述,指的是逻辑上的,但是图中的内容指的是物理运行情况。map操作过程和问题中描述的一样。在shuffle中,他的输入是多个Map结果溢写到磁盘上的文件,然后对文件中的内容进行读取,针对每一个分区进行归并(就是归并排序中的那个归并过程,因为分区的时候有过排序),确保相同key的数据在同一个Reduce任务中执行。然后在Reduce中,得到的是shuffle的结果(数据可能有多个不同的key,但是每一个key必然包含这个key对应的全部数据),因为是多个片段,所以仍然需要进行归并,由于内存空间不一定够大所以还是要写到磁盘上,当归并结束时,就可以从最后生成的文件中读取数据执行用户定影的Reduce任务了。

数据输出

这部分就比较简单了,每个Reduce任务的输出结果将以一个文件的形式保存到指定的目录中,文件名称在之前输入部分的图中可以看到。也就是说MapReduce计算的结果并不是一个完整的文件,而是一组文件。

容错机制

JobTracker故障

影响:正在运行的所有作业的内部状态将会丢失

恢复方式:重新启动JobTracker,所有动作全部重新执行

改进方案:使用多个JobTracker,将其中一个作为主要的JobTracker其他作为备用,然和每一个JobTracker都被ZooKeeper中的ZNode的监听,如果主JobTracker发生故障,则ZooKeeper中对应的ZNode就会删除,这时候备用的JobTracker监听不到主ZNode的消息,就会自动担任JobTracker中的各项任务。

TaskTracker故障

检测方式:JobTracker无法接收到TaskTracker发送的心跳,则JobTracker会认为该TaskTracker失效。

恢复方式:该TaskTracker所在节点运行过的任务均会被标记失败,JobTracker将这些任务调度到别的TaskTracker所在的节点重新执行。

Task故障

原因:代码错误,磁盘数据损坏,节点暂时性故障。

恢复方式:TaskTracker检测到Task故障后,会在下一次心跳中报告给JobTracker,JobTracker在收到报告消息后重新调度执行该任务,该任务可能在集群中的任意一个节点重试,如果经过最大尝试次数之后仍然失败,则标记这个任务为失败。

Map任务直接从HDFS中拉取数据重新执行即可,但是Reduce任务会碰到两种情况,如果能正常读取Map产生的文件,则只执行Reduce任务即可,如果无法读取,就需要连通Map任务一起重新执行。

批处理系统Spark

设计思想

MapReduce的局限性

  1. 编程框架的表达能力有限,用户编程复杂。只提供了map和reduce两个编程算子
  2. 单个作业中需要进行Shuffle的数据以阻塞方式传输,磁盘I/O开销大,延迟高
  3. 多个作业之间的衔接设计I/O开销,应用程序的延迟高

数据模型

Spark将数据抽象为弹性分布式数据(resilient distributed dataset, RDD),其有下面3个特征:

  1. RDD是一个数据集(dataset):与MapReduce不同,Spark的操作对象为抽象的数据而非数据集。
  2. RDD是分布式的:每个RDD可以划分成多个分区,每个分局即为一个数据集片段,一个RDD的不同分区可以存储于集群中的不同节点
  3. RDD具有弹性:具备可恢复的容错特性。

RDD具备这些特征有什么优点? 对于第一个特征,这也就意味着RDD中的数据不必像MapReduce一样构造成键值对的样子,而是以输入数据实际的样子作为输入。对于第二个特征,RDD内部的数据可以有很多的分区,并且这些不同的分区可以存储在不同的节点中,这也就意味着,RDD是一个在逻辑上统一的整体,而物理上是由不同节点中的数据组成的,这会使得在应用执行时,不同节点的计算可以在不同的节点中使用Task线程完成计算。对于第3个特征,则保证了RDD计算的准确性,即使计算过程中进程故障,也能在一定时间内恢复成正常的计算过程。

计算模型

Spark系统提供丰富的操作以对RDD进行变换。这些操作算子可以分为创建、转换和行动操作算子。

  1. 创建操作算子从本地内存或外部数据源创建RDD,提供数据输入的功能
  2. 转换操作算子描述RDD的转换逻辑,提供对RDD进行变换的功能
  3. 行动操作算子标志转换结束,出发DAG生成。

Spark逻辑计算模型通常有两种方式,一种是基于算子的Operator DAG,他会根据具体使用的算子形成有向无环图。

image-20221010201134357

另一种则是基于数据的RDD Lineage,他会构成一个有向无环图,其以变换的数据为节点,边是两个数据变化时使用的算子。

image-20221010201150060

值得注意的是,RDD在变换过程中是只读的,不可变的。RDD转换或行动操作会不断生成新的RDD,而非改变原有的RDD。

在物理计算模型中,Spark利用分布式架构加速数据处理,DAG中的操作算子实际上是由若干实例任务(task)实现的。从算子的角度来看,Spark的物理计算模型可以表示为:

其实和逻辑计算的结构相似,只是在物理计算过程中,算子的计算并不是只使用了一个,数据会分成不同的块,使用相同数量的算子进行操作,在逻辑计算结构中是将这个物理模型统合成一个个整体了。

image-20221010201400316

事实上每个任务通常负责处理RDD的一个分区,从RDD的角度描述物理计算模型如下图所示:(从RDD的角度来看,RDD内部也是由不同的分区组成,在Spark工作过程中,处理的是不同分区这也与RDD描述的逻辑结构相似,只是逻辑结构中将数据统合成一个整体了)

image-20221010201532320

体系架构

架构图

03.1Spark架构图

  1. 集群管理器(cluster manager):管理整个系统的资源并监控工作节点(worker)。Spark可以分为Standalone和Yarn两种不同的部署模式。
    1. Standalone的集群管理器包含Master和Worker
    2. Yarn模式将Spark与资源管理系统Yarn共同部署,集群管理器包括ResourceManager和NodeManager
  2. 执行器(executor):执行任务。其本身是一个运行在工作节点上的进程,通过启动若干线程执行任务。Standalone部署方式下的进程名称为:CoarseGrainedExecutorBackend
  3. 驱动器(driver):启动应用程序的主方法并管理作业运行。其在逻辑上独立与主节点、从节点以及客户端,但根据应用程序的不同运行方式以不同的形式存在。

在Spark中,集群管理器负责集群资源管理,驱动器负责作业管理,实现了资源管理和作业管理两大功能的分离,解决了MapReduce中资源管理和作业管理紧耦合的局限性。

当然这个局限性在MapReduce二代中解决了,主要方案就是将资源管理的任务全部交给Yarn负责,而MapReduce的JobTracker和TaskTracker只负责作业管理。

Standalone模式又可以进一步分为Standalone Client模式和Standalone Cluster模式。

Standalone Client模式下驱动器位于客户端,并处于SparkSubmit进程中,客户端可以查看应用程序的执行过程,一般用于测试。

image-20221010214225243

Standalone Cluster模式下,Master将选择在某一Worker所在节点启动名为DriverWrapper的进程作为驱动器,客户端无法查看应用程序执行过程中的信息,一般用于生产环境。

image-20221010214400913

为什么Client用于测试居多,Cluster用于生产环境呢?举个实际的例子,假设在实际生产环境中,用户要向Spark提交1000个应用程序,如果选择Client模式进行部署,那1000个程序都会挤在客户端的节点中,导致客户端卡死,也不利于应用程序的执行。而Cluster模式,则会在不同的从节点中启动这1000个程序,同时不影响客户端的运行。同样的,在Yarn平台中运行Spark也有Client和Cluster两种模式,不论是在Yarn还是Standalone的条件下,Client和Cluster的优缺点都是基本一致的(前者多用于测试,后者多用于生产环境)。

应用程序的执行流程

03.2Spark应用执行流程

  1. 启动驱动器。Client部署方式将直接在客户端启动驱动器,并向Master注册。Cluster方式部署则客户端将应用程序提交给Master,由Master选择一个工作节点启动驱动器进程(DriverWrapper)。
  2. 构建基本运行环境,即由驱动器创建SparkContext,向集群管理器进行资源申请,并由驱动器进行任务分配和监控。
  3. 集群通知工作节点启动执行器进程,该进程内部以多线程方式运行任务。
  4. 执行器进程向驱动器注册。
  5. SparkContext构建DAG并进行任务划分,从而将其交给执行器进程中的线程以执行任务。

工作原理

Spark应用程序执行过程中,驱动器创建的SparkContext维护了应用程序的基本运行信息。SparkContext将RDD的依赖关系构造成DAG,使用DAG调度器(DAG scheduler)划分DAG Stage,将Stage作为一个TaskSet提交给任务调度器,由其将任务分发给各个工作节点上的执行器执行,整个过程按照Stage依次执行。驱动器具体工作如下图:

03.3驱动器内部工作原理

Stage划分

RDD之间存在依赖关系,通常分为窄依赖和宽依赖,窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用。宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区。如下图所示:

image-20221010224249134

在Stage划分过程中,DAG调度器针对DAG进行反向解析,遇到宽依赖则生成新的Stage,遇到窄依赖则将该Operator加入到当前Stage中,从而使窄依赖尽可能地划分在同一个Stage中。因此Stage内部生成的RDD之间为窄依赖关系,而Stage输出RDD和下一个Stage输入RDD之间为宽依赖关系。只有Stage之间的数据传输需要进行Shuffle。

image-20221010230346256

为什么要按照这种方式进行Stage划分?从上面的描述中可以看出来,窄依赖的数据原都是单向变化的,即数据源是多对一的映射方式,比如map,filter,union操作并不需要数据交互,join也只需要将分散的数据源聚集起来即可。这样窄依赖的计算过程就不会有数据传输的需求。而宽依赖则是有需要传输的需求。因此,将窄依赖统合起来称为一个Stage可以将其抽象成一个完整的计算整体,尽可能地先做好较快的计算后再进行数据交互(比如shuffle),减小不必要的数据传输耗费的时间。

Stage分为ShuffleMapStage和ResultStage两种类型:

  1. ShuffleMapStage输出是另一个Stage,其特点是
    1. 不是最终的Stage,其后面还有其他的Stage
    2. 输出必须经过Shuffle过程,并作为后续Stage的输入
    3. 一个DAG中可能包含该类型Stage,也可能不包含该类型的Stage
  2. ResultStage的特点是:
    1. 是最终的Stage,且其输出即为最终的结果
    2. 一个DAG中必包含该类型Stage

Stage内部的数据传输

Spark系统采用流水线的方式进行Stage内部的数据传输,流水线方式不要求物化前序算子的所有计算结果。简单来说就是在Stage内部中如果输入的数据源之间不存在依赖关系的计算,则可以并发进行,无须等待其他计算结束如果有多个数据源的输入,并且这些数据源有不同的处理方式,就会需要等待。通常情况下,Spark根据每个Stage输出RDD中的分区个数决定启动的任务数量(即TaskSet的大小)以执行该Stage中的操作。

Stage之间的数据传输

Stage之间的数据传输需要进行Shuffle,Shuffle过程分为Shuffle Write和Shuffle Read两个阶段

  1. Shuffle Write阶段,ShuffleMapTask需要将输出RDD的记录按照分区函数划分到相应的bucket中并物化到本地磁盘形成ShuffleblockFile,之后才可以在Shuffle Read阶段被拉取
  2. Shuffle Read阶段,ShuffleMapTask或Result根据partition函数读取相应的ShuffleblcokFile,将其存入缓冲区并继续进行后续的计算。缓冲区并不一定要求对该Task负责的所有记录进行排序后才进行后续处理(与MapReduce先排序再执行Reduce任务不同)

image-20221011111654129

为什么要这么做?Spark本质是一个内存计算系统,也就是说RDD的计算其实要加载到内存中,才进行计算的,如果不进行物化,直接在某一个节点上收集所有的输出结果并将其整合在一起,若整合后的结果不是很大,则影响不大,但是如果整合后的结果非常大,超过了节点的内存那么这样就算整合失败了。就会重新计算,但是会使用同样的方式进行整合,结果仍然是是失败的。物化计算结果到磁盘中,就是解决这个内存不足的问题,毕竟磁盘容量总是要比内存容量好拓展的。

随之而来的问题也有,既然涉及到物化计算结果写入磁盘,写入磁盘的时间肯定是比内存旧的,但是相对而言,物化结果到磁盘能够更节省成本。

应用与作业

从逻辑执行上看:一个Application(应用)由一个或多个DAG构成,一个DAG由一个或多个Stage构成,一个Stage由若干个RDD Operator构成。

从物理执行上看:一个Application等于一个或多个Job,一个Job由一个或多个TaskSet构成,一个TaskSet为多个不存在Shuffle关系的Task集合。

类别 逻辑执行角度 物理执行角度
Application DAG Job
Application Stage TaskSet
Application RDD Operator Task

容错机制

  1. Client故障

    1. Standalone Client模式下,驱动器存在于该进程将导致系统无法正常工作,用户需要重新提交应用程序
    2. Standalone Cluster模式中,只要作业成功提交给系统,Client就不会影响系统中的作业运行,但如果Driver进程故障将导致系统无法正常工作,需要重新提交应用程序。
  2. Master故障

    1. 故障后则无法正常工作,需要重新启动
    2. 或者可以借助ZooKeeper实现高可用性

    ZooKeeper的实现方式就是使用多个Master,并形成对应的Znode,选出主Master,然后备选1监听主Master的Znode,备选2监听备选1的Znode,以此类推,当被监听的Master故障,则使用监听的Master顶替。

  3. Worker或Executor故障

    1. 影响系统中局部计算的结果
    2. 系统可以重启发生故障的进程或将其丢弃将原本由这些进程负责执行的任务交给新的Worker或Executor执行。
    3. 可能会导致部分RDD或RDD中的某些分区丢失

RDD持久化

Worker进程可以管理一定大小的内存,用于存储RDD和计算中需要的其他辅助信息。一旦RDD占用空间达到相应存储空间的阈值,Spark会使用置换算法将部分RDD的内存空间腾出。如果不进行声明,这些RDD将会被直接丢弃。对于需要保留的RDD,则用户在编程时需标明该类RDD需要持久化(通过调用RDD的persist(Storagelevel)方法或cache()方法实现)

故障恢复

SparkContext中维护了RDD的血缘关系(RDD转换操作的DAG)。当某一RDD的部分分区丢失时,其可通过血缘关系获取足够的信息以重新运算和回复丢失的数据分区。窄依赖的RDD中只需要使用对应的数据源重新计算即可恢复,而宽依赖的RDD则需要等待依赖的所有父RDD计算完成后才能进行恢复。

检查点

RDD持久化在一定程度上提供了多个备份,但是这些备份的机器可能发生故障,在这种情况下需要依赖血缘关系进行恢复,但是如果正好依赖血缘关系进行RDD恢复所需的计算时间非常长,这时候就需要通过设置检查点来进行恢复了。

检查点机制将RDD写入可靠的外部分布式文件系统中(本身就自带一定的容错能力),用户需要指定某些计算时间较长的RDD设置检查点,系统在作业结束后启动一个独立的作业进行写检查点操作。通过这样的方式,直接从检查点中读取要恢复的RDD数据,以此加快恢复过程。

优点:和上面说的一样,就是会加快系统的恢复过程

缺点:这个检查点是写在HDFS上的,因此会产生一部分的写入计算开销,同时写入也会占用一定的磁盘空间。同时需要指定哪些RDD是要进行保存的,如果RDD很多,这可能会带来麻烦(但感觉影响又不大,因为在Spark编码的时候,RDD的转化也是一步一步手动定义的,即使使用for循环的方式进行定义,则也只需要记录对应位置然后加入检查点就可以了。除非是很复杂的DAG结构)

资源管理系统Yarn

Yarn是一个通用的资源管理系统,为上层应用提供统一的资源管理和制度。

设计思想

一代MapReduce中,一个显著的缺陷就是资源管理和作业紧密耦合。Yarn的出现使得资源管理模块从第一代MapReduce中独立,称为一个通用的资源管理平台。

作业与资源管理

第一代MapReduce存在以下问题

  1. 资源管理与作业紧密耦合:JobTracker既要负责作业调度和任务调度,又要负责集群中的资源。
  2. 作业的控制管理高度集中:JobTracker负责所有作业的控制,JobTracker需要维护所有作业的元信息,内存开销较大。

基于这些问题,Yarn通过将资源管理和作业管理分离形成通用的资源管理系统,并使作业之间相互独立地控制执行。

为什么要将资源管理与作业分离?这个假设我们又要使用Spark系统又要使用MapReduce系统处理不同的数据,那么如果不将作业管理和资源管理分离的话,两个系统都要占有一定的资源,也就是说,Spark系统占用的资源使用结束后就无法让给MapReduce使用,反过来也是同理。因为两者资源分配的进程不同一,这也就导致了他们都无法统一使用集群中的资源。

平台与框架

如果将资源管理系统视为一个提供资源的平台(platform),则可以在该平台上运行各种分布式系统。与平台相应,将运行在资源管理平台上的分布式计算系统称为框架(framework)。平台提供资源管理的功能,框架为运行在平台上的系统。

Yarn进行资源管理的粒度是应用,称为Yarn平台的应用。框架可以将其应用或作业映射为Yarn平台的应用。

体系架构

架构图

04.1Yarn架构图

Yarn也是采用主从架构,包括主节点运行的ResourceManager、从节点运行的NodeManager和提交应用程序的客户端(Client)。ResourceManager和NodeManager是Yarn的常驻进程,而从节点中的ApplicationMaster是用于某一应用管理的进程,Container表示可以用于执行该应用中具体任务的资源。

  1. ResourceManager(RM):负责整个系统的资源管理和分配的资源管理器,主要由调度器和应用管理器两个组件构成。
    1. 调度器负责分配Container并进行资源调度
    2. 应用管理器负责管理整个系统中运行的所有应用
  2. NodeManager(NM):负责每个节点资源和任务管理的节点管理器。
    1. NM定时向RM汇报本节点的资源使用情况和Container运行状态
    2. NM接受并处理来自AM的Container启动/停止等各种请求
  3. ApplicationMaster(AM):用户基于Yarn平台提交一个框架应用,Yarn均启动一个AM以管理改应用
    1. AM与RM调度器通过协商获取资源,将获取的资源进一步分配给作业内部的业务
    2. AM与NM通过通信启动/停止任务,监控所有任务运行状态,并在任务发生故障时重新申请资源以重启任务
  4. Container:资源的抽象表示,包含CPU、内存等资源,是一个动态资源划分单位
    1. 当AM向RM申请资源时,RM向AM返回以Container表示的资源

应用程序执行流程

image-20221011153951556

  1. 用户编写客户端应用程序,向Yarn提交应用程序
  2. ResourceManager负责接收和处理来自客户端的请求,尝试为该应用程序分配第一个Container,若分配成功则在该Container中启动应用程序的ApplicationMaster
  3. AppicationMaster向ResourceManager注册,以便客户端通过ResourceManager查看应用程序的资源使用情况。ApplicationMaster将应用解析为作业并进一步分解为若干任务,并向ResourceManager申请启动这些任务的资源。
  4. ResoueManager向提出请求的ApplicationMaster分配以Container形式表示的资源。一旦ApplicationMaster申请到资源,即可在多个任务间进行资源分配。
  5. ApplicationMaster确定资源分配方案后,与对应的NodeManager通信,在相应的Container中启动相应的工作进程以执行任务。
  6. 各个工作进程向ApplicationMaster汇报自身的状态和进度,以便令ApplicationMaster随时掌握各个工作进程的任务运行状态。
  7. 随着任务执行结束,ApplicationMaster逐步释放占用的资源,最终向ResourceManager注销并自行关闭。

工作原理

单平台多框架

对于Yarn来说,其只负责向框架提供Container,并不关心在Container中运行何种服务。通过Yarn资源管理平台,多个框架可以部署在同一个物理集群中,并动态地共享资源。

下图展示了在Yarn平台中提交了2个MapReduce应用和1个Spark应用,由此看出,每当提交一个应用时,Yarn均会启动一个对应的ApplicationMaster。

04.2Yarn单平台部署

平台资源分配

ResourceManager的调度器维护了一个或多个应用队列,每个队列拥有一定数量的资源,位于同一队列的应用共享该队列所有拥有的资源。资源调度实际上是决定如何将资源分配给队列,以及分配给队列中应用的过程。Yarn平台负责分配一定的资源,框架决定如何使用这些资源执行任务。

Yarn中的调度器是一个可插拔的组件,可以通过配置文件选择不同的调度器:

  1. FIFO调度器:FIFO仅维护一个队列,该列拥有集群中的所有资源,调度器的资源分配方式为先提交的应用先得到资源。这种调度器的实现方式比较简单,但是会导致一个应用独占所有资源,而其他应用需要不断等待
  2. Capacity调度器:维护层级式队列,集群中的资源划分给这些队列,队列内部的资源分配方式为FIFO,用户提交应用程序时可以指定将应用放入某一队列中。这种方式可以避免某个长时间运行的应用独占集群资源而其他应用得不到运行的情况。但也可能会出现一个队列的资源一直被使用,而另一个则一直没有使用的情况
  3. Fair调度器:维护层级式队列,集群中的资源换分给这些队列,但是队列之间可以共享资源,因而这些队列在逻辑上可以看做一个共享队列。当只有一个应用运行时,该应用可以独占整个集群资源。但当其他应用提交到集群时,将空出部分资源给新的应用,最终所有的应用会根据所需使用内存的大小得到分配的资源。

image-20221011202304292

容错机制

  1. ResourceManager故障:进行故障恢复时需要用某一持久化存储系统中恢复信息。恢复完毕后,ResourceManager会清除当前集群中所有Container,包括运行ApplicationMaster的Container。RM重新启动ApplicationMaster,所有应用重新执行

    可以使用多个ResourceManager通过ZooKeeper进行协调,当主要的RM故障时,ZooKeeper对应的ZNode就监听不到了,这时候就使用其他ResourceManager进行替换。但是又要考虑一个问题,就是所有的RM应该和主RM保持相同的信息,这样才能在不重启的情况下完成替换。

  2. NodeManager故障:RM和NM之间保持心跳通信,因此RM可以通过超时机制判断NM是否发生故障。如果NM发生故障,则RM会认为NM所在节点中的所有容器运行的任务均执行失败,并将执行失败的信息告知ApplicationMaster。因此,AM将向RM重新申请运行资源去运行这些任务。故障NM恢复后,则向RM重新注册,重置本地信息

    一直想知道这个心跳通信的机制是什么情况?为什么没有考虑过这个心跳机制因为某种原因导致信号传递不到RM但实际上NM并没有问题,咋办?

    心跳通信的机制是基于RPC实现的,RPC是分布式的基石,其直接传输信息的二进制流,速度是比较快速的,在网络上可以认为是可靠传输。(实际上传输不到这个就涉及到计网相关的知识了,在网络传输协议中本身就包含一些确保数据正确传输的机制,除非断网,不然应该都是没有太大问题的)。

典型示例

Yarn平台运行MapReduce框架

第一代MapReduce与MapReduce+Yarn对比

系统 第一代MapReduce MapReduce+Yarn
资源管理 JobTracker、TaskTracker ResourceManager
资源管理 JobTracker、TaskTracker NodeManager
应用管理 JobTracker、TaskTracker MRAppMaster
任务执行 Child YarnChild

从对比表中可以看出,原本的JobTracker与TaskTracker对资源管理和应用管理过于耦合了,在Yarn的指导下,资源管理分别由ResourceManager和NodeManager负责,MRAppMaster完成应用管理,实现了资源管理与应用管理分离。Yarn平台运行MapReduce的架构图如下:

04.3MRYARN

Yarn平台运行Spark架构

Spark的Standalone模式可以分为Client和Cluster两种部署方式,因此在Yarn中运行的Spark也可以分为Cluster和Client两种模式。四种模式的对比如下表

模式 Standalone Client Standalone Cluster Yarn Client Yarn Cluster
资源管理 Master Master ResourceManager ResourceManager
资源管理 Worker Worker NodeManager NodeManager
应用管理 SparkSubmit DriverWrapper SparkSubmit ApplicationMaster
任务执行 CoarseGrainedExecutorBackend 同第一个 同第一个 同第一个

Yarn Cluster模式下,除了使用ResourceManager和NodeManager分别替换Master和Worker之外,还创建了ApplicationMaster用于作业管理。

image-20221012101200463

Yarn Client模式下,驱动器仍然是在客户端中,同时使用ResourceManager和NodeManager分别替换了Master和Worker。只是这种情况不会再启动ApplicationMaster进行作业管理,而是在客户端提交一个Spark应用之后ResourceManager随机选取一个NodeManager所在节点启动Executor Laucher,这个进程只负责向ResourceManager申请资源启动CoarseGrainedExecutorBackend,不负责运行Driver。

image-20221012102028026

为什么Yarn Client模式下又不使用ApplicationMaster进行作业管理呢?我的理解是这样的,因为Client模式下需要在客户端监控作业的执行情况,因此作业的进度和状态应该直接向客户端反馈,而ApplicationMaster的任务也是监督作业的进度和状态,这就使得两个功能其实是相同的,这就没有必要在占用资源去启动ApplicationMaster,只是需要一个更轻量的进程去帮助作业启动即可,作业启动后的进度和状态直接由客户端监督。

Yarn平台运行MapReduce和Spark框架

从上面的两个示例来看,不论是MapReduce还是Spark,在Yarn平台上运行都由ResourceManager和NodeManager接管资源管理,因此两者的结合的重合部分就是资源管理部分,同时在作业管理上如果Spark使用的Yarn Cluster不是模式,则两者其实并没有太大区别。具体结合方式如下图所示:

image-20221012104049742

协调服务系统ZooKeeper

设计思想

目标:将实现复杂的分布式一致性服务封装为一个高效可靠的原语集,并以一系列简单易用的接口提供给用户。

ZooKeeper不用于存储大量数据,而是用于存储元数据和配置信息等,以便进行协调服务。

数据模型

ZooKeeper维护的是类似于文件系统的层次数据结构,数据模型为一棵树。树中的每个数据节点称为Znode,每个Znode中均可保存信息。Znode可以分为持久Znode和临时Znode。

  1. 持久Znode一旦被创建,除非主动进行Znode移除操作,否则将一直存在与ZooKeeper系统中。
  2. 临时Znode的生命周期和客户端会话绑定,一旦客户端会话失效,则该客户端创建的所有临时Znode都会被移除。

05.1Znode

操作原语

  1. create:在树中某个位置添加一个Znode
  2. delete:删除某个Znode
  3. exists:判断某个位置是否存在Znode
  4. get data:从某个Znode读取数据
  5. set data:向某个Znode写入数据
  6. get children:查找某个Znode的子节点
  7. sync:用于等待数据同步

体系架构

image-20221012113923387

ZooKeeper包含一组服务器,用于存储数据,每台服务器维护一份树形结构数据的备份。

  1. 领导者:可以为客户端直接提供读、写服务
  2. 追随者:只能直接提供读服务,客户端发给追随者的写操作需要转发给领导者。
  3. 观察者:与追随者类似,但是观察者不参与选举领导的过程

客户端与服务器之间建立的连接称为会话,客户端通过心跳与服务器保持有效连接,一旦连接断开即表示会话结束。客户端能够向ZooKeeper服务发送请求并接收响应,并且能够接收来自服务器的Watch事件通知。这个Watch事件是指客户端可以在某个Znode中设置一个Watcher,一旦该Znode发生变化(存储的数据改变,名称改变,子节点增加)则得到服务器的通知。

为什么要使用这种架构呢?首先一个主,多个备用在分布式系统是很常见的,一个领导者主要用于管理整个ZooKeeper系统,多个备用(观察者,追随者)一方面可以在领导者宕机后进行顶替作用,另一方面是可以缓解领导者的压力,如果所有客户端都只和领导者进行交互,那么其带宽要求通常会很高,使用其他备用服务器读取操作就可以从对应的服务器进行读取了(因为这些主服务器和备用服务器的数据都是同步的)。而写操作还是要从主服务器中写入,再与其他备用服务器同步,这样做的目的是保证只有一个服务器进行写入,避免多个服务器写入后,同步时会产生记录冲突的情况。

工作原理

ZooKeeper属于轻量级的数据存储系统,维护了数据的多个副本,由于写操作会改变数据的内容,因此必须保证所有节点执行系统的操作序列,从而实现副本之间的一致性。ZooKeeper需要由领导者节点保证各个节点执行相同的写操作序列,ZooKeeper工作过程的首要问题是如何在服务器之间确定领导者。

领导者选举

分布式一致性协议用于解决分布式系统如何就某个提议达成一致的问题,Paxos是实现该协议的经典算法之一:由某些节点发出提议,然后由其他节点进行投票表决,最终使所有节点达成一致。

如果将竞选主节点作为一个提议,那么分布式一致性算法可以用于进行选主。初始状态下,所有节点均发出ID提议将自己作为领导者,系统中的节点按照分布式一致性算法进行投票表决,最终仅有一个节点成领导者。

分布式一致性算法-Paxos、Raft、ZAB、Gossip - 知乎 (zhihu.com)

读写请求流程

写请求的流程如下:

image-20221012152351492

  1. 客户端与服务器建立链接并发起写数据请求
  2. 如果该服务器为追随者或观察者,则将接收的写请求转发给领导者,否则领导者可以直接处理
  3. 理论上,领导者需要将写请求转发给所有的服务器节点,直到所有服务器节点均成功执行写操作,该写请求才算完成。(追随者,观察者,领导者之间的数据同步,需要使用一致性协议)
  4. 服务器向客户端返回写成功的信息。

优点:保证领导者服务器负责主要更新操作,确保了写入的一致性

缺点:写请求需要转发给领导者,且在领导者写入后还需要与其他服务器节点进行数据同步,这无疑会增加写入时间。但是和写入不一致相比,增加的时间显然是可以接受的。

在读请求中,如果发送给观察者或者追随者,读取到的内容可能不是最新的(领导者更新写入还没完成),因此客户端可以调用sync()操作等待追随者与领导者进行数据同步,之后再获取最新的内容,具体流程如下:

image-20221012153024747

  1. 客户端与某一服务器建立连接并发起sync()请求
  2. 如果该服务器是追随者或观察者,则与领导者进行数据同步。
  3. 服务器向客户端返回同步成功的信息
  4. 客户端从服务器读取数据

优点:使用sync可以确保多个服务器中的数据始终处于同步状态,确保用户每次读取都能得到一致性的内容。

缺点:一般涉及到同步问题的,都会涉及到的缺点就是等待时间的开销,数据实现同步得到的一致性通常都是会消耗时间的。

容错机制

  1. 领导者故障:ZooKeeper重新进行领导者选举
  2. 追随者或观察者发生故障,则该节点无法对外提供服务,但其他节点仍然可以正常提供服务
  3. 追随者或观察者发生故障重启后,则能够从领导者节点或其他节点进行数据恢复

典型示例

集群管理

分布式计算系统通常使用主从结构,通常情况下只有一个主节点和若干从节点。主节点需要监控集群中从节点的变化情况。为了支持分布式计算系统的高可用性,需要配置多个主节点并进行选主以保证系统的正常运行,ZooKeeper能够很好地完成这个任务。

比如在MapReduce中,JobTracker要监控多个TaskTracker的变化情况,就可以在ZooKeeper中创建一个Znode(/cluster)并watch这个Znode节点。TaskTracker在启动之后会在/cluster节点下面创建临时Znode,当TaskTracker发生故障时,则会与ZooKeeper断开连接,这个TaskTracker对应的临时Znode就会被自动删除,这时候/cluster内的结果发生变化,watch他的JobTracker就会收到变化信息。由此实现了JobTracker对TaskTracker的状态监控。

image-20221012160514589

有什么优势呢?我们可以知道在之前JobTracker对TaskTracker的监控是通过心跳机制进行的,通常情况下这个时间定为3s左右,也就意味着每过3s就要使用一部分计算资源去确保TaskTracker的正常进行,如果TaskTracker的数量很大,那么计算资源就会产生很大的浪费。在引入ZooKeeper的方式之后,按照之前的描述,ZooKeeper只有在TaskTracker发生变化时,才会通知JobTracker,这时候就不需要3s的心跳机制了,节省了一部分的计算资源。

还有在MapReduce之中提到的,使用多个JobTracker实现MapReduce的高可用性。这也需要引入ZooKeeper,创建多个JobTracker,这时候为了维持这些JobTracker的一致性就需要进行选主操作。初始化每个JobTracker均创建名为/epection/p的临时Znode并添加属性。ZooKeeper自动为这些顺序的Znode添加自增的编号,因而选主的方式为选取/election下编号最小的Znode。其余节点作为备用节点,并且监听前一位JobTracker的Znode,一旦监听的Znode告知节点的前一位故障,则当前备用节点直接代替其功能。

image-20221012161453692

这也是很多分布式计算系统的主节点借助ZooKeeper实现高可用性的通用方案。具体来说,Spark的Master,Storm的Nimbus,Flink的JobManager,都可以借助上面的描述方式实现高可用性。

配置更新

分布式系统在运行过程中通常会动态修改配置,当修改配置后,相应的进程也要进行相应的变化。ZooKeeper的Watch机制为这种需求提供了便捷的解决方案。如下图:

image-20221012164927164

首先使用进程A在ZooKeeper中创建一个/config的临时Znode,并让进程B和进程C去watch /config,然后在进程A对config进行修改后,B和C会收到/config修改的通知,重新拉取/config进行相应的变化。

同步控制

分布式计算系统按照BSP模型执行一个迭代作业是,通常需要一组进程共同协助执行各自负责的任务,一方面,进程执行一轮迭代计算的前提通常是所有进程均已做好准备,另一方面,所有参与进程所负责的任务均执行结束后,本轮迭代计算才能视为已完成。因此分布式计算系统需要在每轮迭代计算开始和结束时同步所有参与的进程,称为“双屏障”机制。

假设一项任务由A、B、C三个进程进行,在进入屏障阶段,A先在ZooKeeper中创建一个Znode(/bar),并watch这个Znode。当B进入屏障后,在/bar下面创建P1 Znode。这时候A会收到/bar的通知,但是A在查询后,发现/bar下面只有一个Znode,所以还需要继续等待。当C进入品章后在/bar下面创建P2 Znode。值得一提的是,B和C除了创建Znode外还监听/ready是否存在,一旦其存在则迭代计算的具体内容即可开始。创建好P2之后,/bar给A发送信息,这时候所需的进程都准备完成,A创建/ready,然后开始迭代计算的具体内容。

image-20221012170521771

在离开屏障结束后,B率先计算完成,其主动删除P1的Znode,表示自己准备要离开屏障,这时候A收到来自/bar的通知,但是/bar中还有节点,并未完成本次计算,继续等待。C计算完成后,主动删除P2的Znode,这时候A收到来自/bar的通知,且/bar中没有子节点了,表示计算结束。A将主动删除/ready节点,让B和C离开屏障。

image-20221012171125159