小云云

hdfs 基本操作命令

官网文档:http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/FileSystemShell.html

hadoop fs –ls / 列出当前目录有哪些子目录,有哪些文件。

hadoop fs –mkidr /test 在Hadoop文件系统当中,创建一个test目录

hadoop fs –get /filename 从Hadoop文件系统当中,获取一个文件到本地的文件系统。

hadoop fs –put srcfile /desfile 从本地的文件系统上传一个文件到Hadoop文件系统中。

【翻译】Apache Hadoop MapReduce

原文:http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

综述

Hadoop MapReduce是一个便于开发并行处理海量数据(TB级)的应用的软件框架,该框架在由普通pc机组成的大规模集群(上千台节点)上实现了可靠性及容错。

一个MapReduce任务(job)通常会将输入数据集分片,这一工作是由map任务完全并行的完成的。框架整理map的运行结果,作为reduce任务的输入。通常数据的输入输出都是在文件系统上完成的。MapReduce框架负责调度、监控及重做失败任务的工作。

通常来讲计算节点和存储节点是一样的,也就是说,MapReduce框架及HDFS运行在同一个节点集合。这种配置使得框架可以在数据已就绪的节点集群内高效的调度任务,这样在集群内获得了 非常大的带宽。

MapReduce框架包含一个资源管理器(ResourceManager ),每个节点上的NodeManager及每个应用上的MRAppMaster。

应用至少要指定输入输出位置,并通过适当的接口实现及抽象类来提供map及reduce的功能。

Hadoop的job-client提交前述任务(这个任务可以是jar,也可以是其他可执行的文件),并配置到资源管理器。资源管理器将软件及配置分发给从机,调度并监控任务,向job-client提供状态及诊断信息。

尽管Hadoop框架是用Java实现的,MapReduce应用不限定使用Java编写。

Hadoop  Streaming使得用户可以创造及运行任意的可执行程序作为mapper或者reducer。

Hadoop Pipes是兼容SWIG的C++ API,用于实现MapReduce应用。

输入输出

MapReduce框架运行在键值对(<key, value>)上,也就是说,MapReduce框架将任务的输入视为一个键值对的集合,产生新的键值对集合作为任务输出。key及value的类需要能够被框架序列化,因此必须实现Hadoop的writable接口(org.apache.hadoop.io )。此外,key类需要实现WritableComparable接口(org.apache.hadoop.io)来促进框架的排序。

一个MapReduce任务的输入输出类型示例:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

接下来玩例子:

MapReduce工作方式的小栗子:词频统计(wordcount)

 Java 源代码

复制上述代码到文件:

注意!文件夹及后面的jar需要让hdfs用户有7的权限!!!否则后面执行出错。

使用

假设环境变量设置如下(主要添加了后两条,不加会有classNotFound的错误)

编译前述WordCount.java文件并生成jar。

cdh hadoop默认lib目录:

/var/lib/

假设输入输出目录如下

/user/class_example/4_1/wordcount/input

/user/class_example/4_1/wordcount/output

 

在本地生成输入文件

导入到hdfs

(在jar的目录)跑一把前面生成的MapReduce程序的jar

参观一下输出:

 

 

 

 

 

 

 

 

 

 

 

【翻译】Apache Hadoop 下一代MapReduce ——YARN

官网链接:http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

在学习YARN之前先了解一下MapReduce:http://bananalighter.com/apache-hadoop-mapreduce/

MapReduce经历了hadoop-0.23版本的大规模修改,目前是MapReduce2.0(MRv2)或者叫做YARN。

MRv2的核心思想是将JobTracker的资源管理及任务的调度监控分解为多个不同的后台程序。这个思路是建立一个全局的资源管理器(ResourceManager),为每一个应用建立一个应用控制器(ApplicationMaster)。应用要么是一个传统的MapReduce任务,要么是一系列任务的有向无环图(DAG)。

资源管理器(RM)及每个节点上的节点管理器(NodeManager)来自于数据计算框架。资源管理器(RM)是系统中所有应用及其资源的最高级别的仲裁。

每个应用的应用管理员(ApplicationMaster)是一个框架的特定库,用于从资源管理器(RM)协调资源,并和节点管理器(NM)一起执行和监控任务。

YARN arch

 

资源管理器(RM)包含两个主要组成部分:调度器及应用管理器(ApplicationManager)。

调度器负责为不同的应用分配容量资源、排队等等。调度器单纯进行调度,不进行监控及状态跟踪。

翻译了一半发现董西城的blog上已经早都有了。。。

链过去好了= =:http://dongxicheng.org/mapreduce-nextgen/nextgen-mapreduce-introduction/

删除cloudera manager 及 cdh

边翻译边干:http://www.cloudera.com/content/cloudera/en/documentation/cloudera-manager/v5-0-0/Cloudera-Manager-Installation-Guide/cm5ig_uninstall_cm.html

配置情况

一台cm server(使用mysql)

四台cdh主机

默认的用户目录

以下目录是默认安装时的各个工具的目录:

/var/lib/flume-ng /var/lib/hadoop* /var/lib/hue /var/lib/navigator /var/lib/oozie /var/lib/solr /var/lib/sqoop* /var/lib/zookeeper /dfs /mapred /yarn

假如在安装时自己改过,那就在重新安装的时候注意删除自定义的目录。

停止各种cdh及cm的服务

可以在界面上操作

删除cm server上的服务及安装

检查cm server状态

确认server及agent关闭后删除:

清除mysql数据库

 

 删除集群上的cm agent及各种cdh软件

关闭cm agent

删除agent

clean yum cache

 删除cdh中的组件

删除cloudera manager的数据

重启后试试 物理机重启那叫一个慢。。。。。。。。

重启以后删掉了。

删除cloudera manager的lock文件

竟然没有,可能是因为通过命令正常关闭的,本身没有产生lock。

再删除其他hadoop的组件

*我没有删除/dfs,因为最初物理分区直接给分了个/dfs出来。

done

使用CDH Manager(及本地源)自动化安装CDH 5

一、准备工作

共性准备配置

1.ntp服务器

2.关闭iptables及selinux

3.配置hosts文件(增加Manager机记录、增加所有slaver机记录)

slaver机准备

1.slaver机配置yum文件(添加cm、cdh的源,添加rhel光盘的源)

(添加光盘的源不再赘述)

manager机准备

1.安装并建立本地repo的http服务

2.http://bananalighter.blog.51cto.com/6386339/1546624

二、使用CM 安装CDH

登陆http://ip:7180

添加主机按提示操作(选择本地repo)。

参考:http://debugo.com/cm5-install/

IaaS, PaaS和SaaS 形象的图 马个克

原文:http://www.u1city.net/Article-538.html

iaas-paas-saas

IaaS: Infrastructure-as-a-Service(基础设施即服务)

第一层叫做IaaS,有时候也叫做Hardware-as-a-Service,几年前如果你想在办公室或者公司的网站上运行一些企业应用,你需要去买服务器,或者别的高昂的硬件来控制本地应用,让你的业务运行起来。

但是现在有IaaS,你可以将硬件外包到别的地方去。IaaS公司会提供场外服务器,存储和网络硬件,你可以租用。节省了维护成本和办公场地,公司可以在任何时候利用这些硬件来运行其应用。

一些大的IaaS公司包括Amazon, Microsoft, VMWare, Rackspace和Red Hat.不过这些公司又都有自己的专长,比如Amazon和微软给你提供的不只是IaaS,他们还会将其计算能力出租给你来host你的网站。

PaaS: Platform-as-a-Service(平台即服务)

第二层就是所谓的PaaS,某些时候也叫做中间件。你公司所有的开发都可以在这一层进行,节省了时间和资源。

PaaS公司在网上提供各种开发和分发应用的解决方案,比如虚拟服务器和操作系统。这节省了你在硬件上的费用,也让分散的工作室之间的合作变得更加容易。网页应用管理,应用设计,应用虚拟主机,存储,安全以及应用开发协作工具等。

一些大的PaaS提供者有Google App Engine,Microsoft Azure,Force.com,Heroku,Engine Yard。最近兴起的公司有AppFog, Mendix 和 Standing Cloud

SaaS: Software-as-a-Service(软件即服务)

第三层也就是所谓SaaS。这一层是和你的生活每天接触的一层,大多是通过网页浏览器来接入。任何一个远程服务器上的应用都可以通过网络来运行,就是SaaS了。

你消费的服务完全是从网页如Netflix, MOG, Google Apps, Box.net, Dropbox或者苹果的iCloud那里进入这些分类。尽管这些网页服务是用作商务和娱乐或者两者都有,但这也算是云技术的一部分。

一些用作商务的SaaS应用包括Citrix的GoToMeeting,Cisco的WebEx,Salesforce的CRM,ADP,Workday和SuccessFactors。

 

 

 

 

【翻译】CDH 的Cloudera Manager免费与收费版的对比表 = =

翻译:http://www.cloudera.com/content/cloudera/en/products-and-services/cloudera-enterprise/cloudera-manager/cloudera-manager-features.html

CDH 特性 免费版 付费版
Deployment, Configuration & Management 系统管理
Automated Deployment & Hadoop Readiness Checks 自动化部署及快速检查
Install the complete CDH stack in minutes and ensure optimal settings 安装完整的CDH及优化配置
Service Management 服务管理
Configure and manage all CDH services, including Impala and Search, from a central interface 提供统一的界面管理与配置全部的CDH服务,包括cloudera impala及cloudera search
Security Management 安全
Configure and manage security across the cluster – including Kerberos authentication and role-based (administrator and read-only) administration 跨群集的安全管理与配置(包括Kerberos认证及基于角色的管理)
Resource Management 资源管理
Allocate cluster resources by workload or by user/group/application to eliminate contention and ensure Quality-of-Service (QoS) 根据工作量分配资源,或根据/user/group/application文件消除争用,保证QoS
High Availability HA
Easily configure and manage High Availability for various services like HDFS, MapReduce, Oozie, YARN, HBase 为多种服务配置HA:HDFS,MapReduce,Oozie,YARN,Hbase
Client Configuration Management 管理客户端配置
Centrally configure all client access to the cluster 集中配置连接到群集的客户端
Node Templating 节点模板
Easily deploy and expand heterogeneous clusters by creating templates for node roles 通过为节点角色创造模板,来部署和扩展不同的群集
Comprehensive Workflows 全面的工作流
Perform end-to-end tasks such as start/stop/restart clusters, services and roles, add/delete hosts, decommission nodes etc. 执行端到端的任务,如群集、服务、角色级别的启停,增删主机,解除节点等。
Multi-Cluster Management 多群集管理
Manage multiple CDH clusters from a single instance of Cloudera Manager 一个Manager管理多个CDH群集
Monitor
Service, Host & Activity Monitoring 服务、主机、活动的监控
Get a consolidated, real-time view of the state of all services, hosts and activities running in the cluster 对服务、主机、活动的统一的实时的监控
Events & Alerts 事件和警报
Create, aggregate and receive alerts on relevant Hadoop events pertaining to system health, log messages, user actions and activities Set thresholds and create custom alerts for metrics collected by CM 创建、合计、接收Hadoop相关的系统健康、日志信息、用户动作和活动的警报。设置阈值并创建用户警报。
Diagnose
Global Time Control 全程控制
Correlate all views along a configurable timeline to simplify diagnosis 通过可配置的时间线串联所有视图,简化诊断。
Proactive Health Checks 健康预检
Monitor dozens of service performance metrics and get alerts you when you approach critical thresholds 监控服务性能,当达到阈值时向用户报警。
Heatmaps 热度图
Visualize health status and metrics across the cluster to quickly identify problem nodes and take action 图形化展示群集的健康状态,便于发现故障节点并修复。
Customizable Charts 可定制的图表
Report and visualize on key time-series metrics about services, roles and hosts 按照时间顺序提供服务、角色和主机的形象报告。
Intelligent Log Management 智能日志管理
Gather, view and search Hadoop logs collected from across the cluster 可以收集、观察和查询从群集中获得的Hadoop日志。
Integrate
Comprehensive API 广泛的API
Easily integrate Cloudera Manager with your existing enterprise-wide management and monitoring tools 可以简单的将CM与现有的企业范围的管理和监控工具集成起来。
3rd Party Application Management 对第三方应用的管理
Deploy, manage and monitor services for 3rd party applications running on the cluster (e.g. data integration tools, math/machine learning applications, non-CDH services etc.) 部署、管理和监控运行在群集上的第三方应用服务。
Advanced Management Features (Enabled by Subscription)
Operational Report & Quota Management 操作报告和配额管理
Visualize current and historical disk usage; set user and group-based quotas; and track MapReduce, Impala, YARN and HBase usage 1.当前及历史磁盘用量展示
2.基于用户和组的配额设置
3.跟踪MapReduce、Impala、YARN和Hbase的用量
Configuration History & Rollbacks 记录配置历史及回滚
Maintain a trail of all actions and a complete record of configuration changes, including the ability to roll back to previous states 保留所有活动及配置变化的痕迹档案,包含回滚到之前状态的能力。
Rolling Updates 滚动升级
Stage service updates and restarts to portions of the cluster sequentially to minimize downtime when upgrading or updating your cluster 分阶段升级和重启群集各部分,最小化宕机时间。
AD Kerberos Integration AD与Kerberos的集成
Integrate directly with Active Directory to get started easily with Kerberos 直接与AD集成,可以方便的与Kerberos一起工作
Kerberos Wizard Kerberos向导
Easily configure Kerberos and trigger automated workflows to secure clusters 方便配置Kerberos,可以自动触发工作流来保证群集安全。
Hadoop SSL Related Configs Hadoop SSL相关配置
Simplify configs and eliminates need for safety valves 简化配置并减少安全阀的需求
LDAP/SAML Integration LDAP/SAML的集成
Integrate user credentials with Active Directory and enable single sign-on (SSO) capabilities 集成了基于AD的用户验证,并提供了SSO能力。
SNMP Support 对SNMP的支持
Send Hadoop-specific events and alerts to global monitoring tools as SNMP traps 以SNMP 异常报告的方式向全局监控工具发送Hadoop特定的事件和告警。(参见文末 注1)
Scheduled Diagnostics cloudera技术诊断的支持
Take a snapshot of the cluster state and automatically send it to Cloudera support to assist with optimization and issue resolution 优化和解决问题时,收集群集状态快照并自动发送至cloudera支持。
Automated Backup & Disaster Recovery 自动化备份和灾难恢复
Centrally configure and manage snapshotting and replication workflows for HDFS, Hive and HBase 集中配置和管理快照,复制HDFS、Hive、HBase工作流。

 

注:

1.snmp traps:SNMP是指简单网络管理协议,trap是它规定的一种通信方式,用于被管理的设备主动向充当管理者的设备报告自己的异常信息。

学习spark中的关

学习spark中的关

原文(扫盲扫的很舒畅):http://www.searchdatabase.com.cn/showcontent_79727.htm

一、Driver&worker

二、RDD(即弹性分布式数据集 Resilient Distributed Dataset 

三、DAG(http://bananalighter.blog.51cto.com/6386339/1557614

四、扩展阅读(spark使用场景等):http://tech.ddvip.com/2013-10/1381214820203655.html

尽管Hadoop适合大多数批处理工作负载,而且在大数据时代成为企业的首选技术,但由于以下几个限制,它对一些工作负载并不是最优选择:

  • 缺少对迭代的支持

  • 需要将中间数据存在硬盘上以保持一致性,因此会有比较高的延迟

当然,整个Hadoop生态系统是在不断演进的,包括Map/Reduce已经证明是处理大规模海量数据的理想方式。而HDFS、HBase等在过去几年中也有了长足的进步。

在本文中,我们将深入了解一下过去一年中“红透半边天”的技术Spark,它与Hadoop架构类似,但是在许多方面都弥补了Hadoop的不足,比如在进行批处理时更加高效,并有更低的延迟。在大数据时代,Spark给我们带了新的选择,它的前途不可限量。

在Spark集群中,有两个重要的元素,即driver和worker。  driver 程序是应用逻辑执行的起点,而多个worker用来对数据进行并行处理。driver 程序是应用逻辑执行的起点,而多个worker用来对数据进行并行处理。尽管不是强制的,但数据通常是与worker搭配,并在集群内的同一套机器中进行分区。在执行阶段,driver程序会将code/closure传递给worker机器,同时相应分区的数据将进行处理。数据会经历转换的各个阶段,同时尽可能地保持在同一分区之内。执行结束之后,worker会将结果返回到driver程序。

 

在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset ,RDD),它是逻辑集中的实体,但在集群中的多台机器上进行了分区。通过对多台机器上不同RDD联合分区的控制,就能够减少机器之间数据混合(data shuffling)。Spark提供了一个“partition-by”运算符,能够通过集群中多台机器之间对原始RDD进行数据再分配来创建一个新的RDD。

RDD可以随意在RAM中进行缓存,因此它提供了更快速的数据访问。目前缓存的粒度在处在RDD级别,因此只能是全部RDD被缓存。在集群中有足够的内存时,Spark会根据LRU驱逐算法将RDD进行缓存。

RDD提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,而应用逻辑可以表达为一系列转换处理。

通常应用逻辑是以一系列TRANSFORMATION和ACTION来表达的。前者在RDD之间指定处理的相互依赖关系DAG,后者指定输出的形式。调度程序通过拓扑排序来决定DAG执行的顺序,追踪最源头的节点或者代表缓存RDD的节点。

Spark中的依赖性主要体现为两种形式,宽与窄(Narrow dependency,Wide dependency)。Narrow dependency是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide dependency是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。注意Narrow dependency 的RDD可以通过相同的键进行联合分区,整个操作都可以在一台机器上进行,不会造成网络之间的数据混合。另一方面,Wide dependency的RDD就会涉及到数据混合。调度程序会检查依赖性的类型,将Narrow dependency 的RDD划到一组处理当中,即stage。Wide dependency在一个执行中会跨越连续的stage,同时需要显式指定多个子RDD的分区。

典型的执行顺序如下:

  1. RDD直接从外部数据源创建(HDFS、本地文件等)

  2. RDD经历一系列的TRANSFORMATION( map、flatMap、filter、 groupBy、join),每一次都会产生不同的RDD,供给下一个TRANSFORMATION使用

  3. 最后一步就是ACTION(count、collect、save、take),将最后一个RDD进行转换,输出到外部数据源。

上述一系列处理称为一个lineage,即DAG拓扑排序的结果。在lineage中生成的每个RDD都是不可变的。事实上,除非被缓存,每个RDD在进入下一个TRANSFORMATION前都只使用一次。

在一个典型的分布式系统当中,故障恢复是由主动监控系统以及不同机器之间的数据复制来实现的。当一台机器出现故障,其他的机器上总是会有一套数据副本来进行故障恢复。

而在Spark当中则采取了不同的方法。首先作为一个大型的集群,Spark并不应该是一个大规模数据集群。Spark针对工作负载会做出两种假设:

  • 处理时间是有限的

  • 保持数据持久性是外部数据源的职责,主要是让处理过程中的数据保持稳定

Spark在执行期间发生数据丢失时会选择折中方案,它会重新执行之前的步骤来恢复丢失的数据。但这并不是说丢弃所有之前已经完成的工作,而重新开始再来一遍。我们只需要再执行一遍父RDD的相对应分区。

需要知道,丢失分区的再执行其实与DAG的延迟计算一样的,它开始于DAG的叶节点,追溯父RDD需要的依赖性,然后逐渐追踪到源节点。丢失节点的重新计算其实与它非常类似,但它把分区作为额外的信息,以便决定需要哪些父RDD。

然而,跨Wide dependency的再执行能够涉及到多个父RDD从而引发全部的再执行。为了规避这一点,Spark会保持Map阶段中间数据输出的持久,以免它们混合到不同机器上执行reduce阶段。在机器发生故障的情况下,再执行只需要回溯mapper持续输出的相应分区,来获取中间数据。Spark还提供了检查点的API,明确持久中间RDD,这样再执行就不必追溯到最开始的阶段。未来,通过在恢复的延迟以及根据统计结果的check-pointing总开销之间进行权衡,Spark会自动化地执行check-pointing。

Spark为构建低延迟,大规模并行处理的大数据分析应用提供了强大的处理框架。它支持围绕RDD抽象的API,同时包括一套TRANSFORMATION和ACTION操作,以及针对大量流行编程语言的支持,比如Scala、Java和Python。

sparkinternal作业调度【mark待学,先看看DAG的解释】

sparkinternal作业调度【mark待学,先看看DAG的解释】

出处:http://blog.csdn.net/colorant/article/details/24010035


Spark中作业调度的相关类最重要的就是DAGSchedulerDAGScheduler顾名思义就是基于DAG图的Scheduler


DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。

 

在作业调度系统中,调度的基础就在于判断多个作业任务的依赖关系,这些任务之间可能存在多重的依赖关系,也就是说有些任务必须先获得执行,然后另外的相关依赖任务才能执行,但是任务之间显然不应该出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG有向无环图来表示。

 

概括地描述DAGSchedulerTaskScheduler(关于TaskScheduler的相关细节,在我之前的关于Spark运行模式的文章中有)的功能划分就是:TaskScheduler负责实际每个具体任务的物理调度,DAGScheduler负责将作业拆分成不同阶段的具有依赖关系的多批任务,可以理解为DAGScheduler负责任务的逻辑调度。

 

 

基本概念

 

Task任务 :单个分区数据集上的最小处理流程单元

TaskSet任务集:一组关联的,但是互相之间没有Shuffle依赖关系的任务所组成的任务集

Stage调度阶段:一个任务集所对应的调度阶段

Job作业:一次RDD Action生成的一个或多个Stage所组成的一次计算作业

 

 

运行方式

 

DAGSchedulerSparkContext初始化过程中实例化,一个SparkContext对应一个DAGSchedulerDAGScheduler的事件循环逻辑基于Akka Actor的消息传递机制来构建,在DAGSchedulerStart函数中创建了一个eventProcessActor用来处理各种DAGSchedulerEvent,这些事件包括作业的提交,任务状态的变化,监控等等

[plain] view plaincopy

  • private[scheduler]case class JobSubmitted(  

  •     jobId: Int,  

  •     finalRDD: RDD[_],  

  •     func: (TaskContext, Iterator[_]) => _,  

  •     partitions: Array[Int],  

  •     allowLocal: Boolean,  

  •     callSite: String,  

  •     listener: JobListener,  

  •     properties: Properties = null)  

  •   extends DAGSchedulerEvent  

  •    

  • private[scheduler]case class JobCancelled(jobId: Int) extends DAGSchedulerEvent  

  • private[scheduler]case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent  

  • private[scheduler]case object AllJobsCancelled extends DAGSchedulerEvent  

  • private[scheduler]  

  • case classBeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent  

  •    

  • private[scheduler]  

  • case classGettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent  

  •    

  • private[scheduler]case class CompletionEvent(  

  •     task: Task[_],  

  •     reason: TaskEndReason,  

  •     result: Any,  

  •     accumUpdates: Map[Long, Any],  

  •     taskInfo: TaskInfo,  

  •     taskMetrics: TaskMetrics)  

  •   extends DAGSchedulerEvent  

  •    

  • private[scheduler]case class ExecutorAdded(execId: String, host: String) extendsDAGSchedulerEvent  

  • private[scheduler]case class ExecutorLost(execId: String) extends DAGSchedulerEvent  

  • private[scheduler]  caseclass TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent  

  • private[scheduler]case object ResubmitFailedStages extends DAGSchedulerEvent  

  • private[scheduler]case object StopDAGScheduler extends DAGSchedulerEvent  


  •  

    不论是Client还是TaskSchedulerDAGScheduler的交互方式基本上都是通过DAGScheduler暴露的函数接口间接的给eventProcessActor发送相关消息

     

    如前面所说,DAGScheduler最重要的任务之一就是计算作业和任务的依赖关系,制定调度逻辑

     

    DAGScheduler作业调度的两个主要入口是submitJob  runJob,两者的区别在于前者返回一个Jobwaiter对象,可以用在异步调用中,用来判断作业完成或者取消作业,runJob在内部调用submitJob,阻塞等待直到作业完成(或失败)

     

    具体往DAGScheduler提交作业的操作,基本都是封装在RDD的相关Action操作里面,不需要用户显式的提交作业

     

    用户代码都是基于RDD的一系列计算操作,实际运行时,这些计算操作是Lazy执行的,并不是所有的RDD操作都会触发SparkCluster上提交实际作业,基本上只有一些需要返回数据或者向外部输出的操作才会触发实际计算工作,其它的变换操作基本上只是生成对应的RDD记录依赖关系。

     

    DAGScheduler内部维护了各种 task / stage / job之间的映射关系表

     

    工作流程

     

    提交并运行一个Job的基本流程,包括以下步骤

     

    划分Stage

     

    当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。

     

    GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由ShuffleDependency中的Partitioner对象来决定。

     

    可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个Stage里进行的。

     

    生成Job,提交Stage

     

    上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发JobRDD所关联的Stage作为FinalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。

     

    具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交

     

    什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage

     

    此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程

     

    任务集的提交

     

    每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet,这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算

     

    任务作业完成状态的监控

     

    要保证相互依赖的job/stage能够得到顺利的调度执行,DAGScheduler就必然需要监控当前Job / Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护JobStage的状态信息。

     

    此外TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了,或者由于任何原因与Driver失去联系了,则对应的StageshuffleMapTask的输出结果也将被标志为不可用,这也将导致对应Stage状态的变更,进而影响相关Job的状态,再进一步可能触发对应Stage的重新提交来重新计算获取相关的数据。

     

    任务结果的获取

     

    一个具体的任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务的结果的返回方式也不同

     

    对于FinalStage所对应的任务(对应的类为ResultTask)返回给DAGScheduler的是运算结果本身,而对于ShuffleMapTask,返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage的任务的获取输入数据的依据

     

    而根据任务结果的大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过特定尺寸(默认约10MB)则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskSchedulerTaskScheduler进而调用TaskResultGetterIndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。当然从DAGScheduler的角度来说,这些过程对它来说是透明的,它所获得的都是任务的实际运算结果。

     

     

    TaskSetManager

     

    前面提到DAGScheduler负责将一组任务提交给TaskScheduler以后,这组任务的调度工作对它来说就算完成了,接下来这组任务内部的调度逻辑,则是由TaskSetManager来完成的。

     

    TaskSetManager的主要接口包括:

     

    ResourceOffer根据TaskScheduler所提供的单个Resource资源包括hostexecutorlocality的要求返回一个合适的TaskTaskSetManager内部会根据上一个任务成功提交的时间,自动调整自身的Locality匹配策略,如果上一次成功提交任务的时间间隔很长,则降低对Locality的要求(例如从最差要求Process Local降低为最差要求Node Local),反之则提高对Locality的要求。这一动态调整Locality策略基本可以理解为是为了提高任务在最佳Locality的情况下得到运行的机会,因为Resource资源可能是在短期内分批提供给TaskSetManager的,动态调整Locality门槛有助于改善整体的Locality分布情况。

     

    举个例子,如果TaskSetManager内部有a/b两个任务等待调度,a/b两个任务Prefer的节点分别是Host A  Host B 这时候先有一个Host C的资源以最差匹配为Rack Local的形式提供给TaskSetManager,如果没有内部动态Locality调整机制,那么比如a任务将被调度。接下来在很短的时间间隔内,一个Host A的资源来到,同样的b任务被调度。 而原本最佳的情况应该是任务b调度给Host C 而任务a调度给Host A

     

    当然动态Locality也会带来一定的调度延迟,因此如何设置合适的调整策略也是需要针对实际情况来确定的。目前可以设置参数包括

    spark.locality.wait.process

    spark.locality.wait.node

    spark.locality.wait.rack

     

    即各个Locality级别中TaskSetManager等待分配下一个任务的时间,如果距离上一次成功分配资源的时间间隔超过对应的参数值,则降低匹配要求(即process -> node -> rack -> any) 而每当成功分配一个任务时,则重置时间间隔,并更新Locality级别为当前成功分配的任务的Locality级别

     

     

    handleSuccessfulTask / handleFailedTask /handleTaskGettingResult :用于更新任务的运行状态,Taskset Manager在这些函数中除了更新自身维护的任务状态列表等信息,用于剩余的任务的调度以外,也会进一步调用DAGScheduler的函数接口将结果通知给它。

     

    此外,TaskSetManager在调度任务时还可能进一步考虑Speculation的情况,亦即当某个任务的运行时间超过其它任务的运行完成时间的一个特定比例值时,该任务可能被重复调度。目的当然是为了防止某个运行中的Task由于某些特殊原因(例如所在节点CPU负载过高,IO带宽被占等等)运行特别缓慢拖延了整个Stage的完成时间,Speculation同样需要根据集群和作业的实际情况合理配置,否则可能反而降低集群性能。

     

     

    Pool 调度池

     

    前面我们说了,DAGScheduler负责构建具有依赖关系的任务集,TaskSetManager负责在具体的任务集的内部调度任务,而TaskScheduler负责将资源提供给TaskSetManager供其作为调度任务的依据。但是每个SparkContext可能同时存在多个可运行的任务集(没有依赖关系),这些任务集之间如何调度,则是由调度池(POOL)对象来决定的,Pool所管理的对象是下一级的Pool或者TaskSetManager对象

     

    TaskSchedulerImpl在初始化过程中会根据用户设定的SchedulingMode(默认为FIFO)创建一个rootPool根调度池,之后根据具体的调度模式再进一步创建SchedulableBuilder对象,具体的SchedulableBuilder对象的BuildPools方法将在rootPool的基础上完成整个Pool的构建工作。

     

    目前的实现有两种调度模式,对应了两种类型的Pool

     

    FIFO:先进先出型,FIFO Pool直接管理的是TaskSetManager,每个TaskSetManager创建时都存储了其对应的StageIDFIFO pool最终根据StageID的顺序来调度TaskSetManager

     

    FAIR:公平调度,FAIR Pool管理的对象是下一级的POOL,或者TaskSetManager,公平调度的基本原则是根据所管理的Pool/TaskSetManager中正在运行的任务的数量来判断优先级,用户可以设置minShare最小任务数,weight任务权重来调整对应Pool里的任务集的优先程度。当采用公平调度模式时,目前所构建的调度池是两级的结构,即根调度池管理一组子调度池,子调度池进一步管理属于该调度池的TaskSetManager

     

    公平调度模式的配置通过配置文件来管理,默认使用fairscheduler.xml文件,范例参见conf目录下的模板:

     

    [html] view plaincopy

  • <?xmlversionxmlversion=“1.0”?>  

  • <allocations>  

  •   <pool name=“production”>  

  •    <schedulingMode>FAIR</schedulingMode>  

  •     <weight>1</weight>  

  •     <minShare>2</minShare>  

  •   </pool>  

  •   <pool name=“test”>  

  •    <schedulingMode>FIFO</schedulingMode>  

  •     <weight>2</weight>  

  •     <minShare>3</minShare>  

  •   </pool>  

  • </allocations>  


  •  

    由于这里的调度池是在SparkContext内部的调度,因此其调度范畴是一个基于该SparkContextSpark应用程序,正常情况下,多个Spark应用程序之间在调度池层面是没有调度优先级关系的。那么这种调度模式的应用场合是怎样的呢? 举一个例子就是SparkServer或者SharkServer,作为一个长期运行的SparkContext,他们代理运行了其它连上ServerSpark应用的任务,这样你可以为每个链接按照用户名指定一个Pool运行,从而实现用户优先级和资源分配的合理调度等。

     

    Spark应用之间的调度

     

    前面提到调度池只是在SparkContxt内部调度资源,SparkContext之间的调度关系,按照Spark不同的运行模式,就不一定归Spark所管理的了。

     

    MesosYARN模式下,底层资源调度系统的调度策略由MesosYARN所决定,只有在Standalone模式下,Spark Master按照当前cluster资源是否满足等待列表中的Spark应用 对内存和CPU资源的需求,而决定是否创建一个SparkContext对应的Driver,进而完成Spark应用的启动过程,这可以粗略近似的认为是一种粗颗粒度的有条件的FIFO策略吧