什么是Hadoop?
hadoop一个用于在普通硬件构成 的大集群上运行应用程序的框架。Hadoop框架透明地为应用程序提供可靠性与数据移动保障。Hadoop实现了一个被称为 mapReduce的 计算模型,在这个计算模型中应用程序被分为很多的小块,每一块都能在集群中的任意节点上执行或重新执行。另外,它还提供了一个分布式文件系统(HDFS)来在计算节 点上存储数据,为集群提供了非常高的聚合带宽。在本框架中无论是Map/Reduce还是分布式文件系统都被设计为能够自动地处理节点上的错误。
Hadoop的组成:
Hadoop Common – contains libraries and utilities needed by other Hadoop modules Hadoop Distributed File System (HDFS) – a distributed file-system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster. Hadoop YARN – a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users' applications. Hadoop MapReduce – a programming model for large scale data processing.
|
Hadoop框架中最核心的设计就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇论文所提及而被广为流传的,简单的一句话解释MapReduce就是“任务的分解与结果的汇总”。HDFS是Hadoop分布式文件系统(Hadoop Distributed File System)的缩写,为分布式计算存储提供了底层支持。
MapReduce原理:
MapReduce从它名字上来看就大致可以看出个缘由,两个动词Map和Reduce,“Map(展开)”就是将一个任务分解成为多个任务,“Reduce”就是将分解后多任务处理的结果汇总起来,得出最后的分析结果。这不是什么新思想,其实在前面提到的多线程,多任务的设计就可以找到这种思想的影子。不论是现实社会,还是在程序设计中,一项工作往往可以被拆分成为多个任务,任务之间的关系可以分为两种:一种是不相关的任务,可以并行执行;另一种是任务之间有相互的依赖,先后顺序不能够颠倒,这类任务是无法并行处理的。回到大学时期,教授上课时让大家去分析关键路径,无非就是找最省时的任务分解执行方式。在分布式系统中,机器集群就可以看作硬件资源池,将并行的任务拆分,然后交由每一个空闲机器资源去处理,能够极大地提高计算效率,同时这种资源无关性,对于计算集群的扩展无疑提供了最好的设计保证。任务分解处理以后,那就需要将处理以后的结果再汇总起来,这就是Reduce要做的工作。结构图如下:
网上有个简单的比喻来解释MapReduce原理:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
上图就是MapReduce大致的结构图,在Map前还可能会对输入的数据有Split(分割)的过程,保证任务并行效率,在Map之后还会有Shuffle(混合)的过程,对于提高Reduce的效率以及减小数据传输的压力有很大的帮助。后面会具体提及这些部分的细节。
1.Map-Reduce的逻辑过程
假设我们需要处理一批有关天气的数据,其格式如下:
1.按照ASCII码存储,每行一条记录
2.每一行字符从0开始计数,第15个到第18个字符为年
3.第25个到第29个字符为温度,其中第25位是符号+/-
0067011990999991950051507+0000+ 0043011990999991950051512+0022+ 0043011990999991950051518-0011+ 0043012650999991949032412+0111+ 0043012650999991949032418+0078+ 0067011990999991937051507+0001+ 0043011990999991937051512-0002+ 0043011990999991945051518+0001+ 0043012650999991945032412+0002+ 0043012650999991945032418+0078+
|
现在需要统计出每年的最高温度。
Map-Reduce主要包括两个步骤:Map和Reduce
每一步都有key-value对作为输入和输出:
1.map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
2.map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:
(0, 0067011990999991950051507+0000+) (33, 0043011990999991950051512+0022+) (66, 0043011990999991950051518-0011+) (99, 0043012650999991949032412+0111+) (132, 0043012650999991949032418+0078+) (165, 0067011990999991937051507+0001+) (198, 0043011990999991937051512-0002+) (231, 0043011990999991945051518+0001+) (264, 0043012650999991945032412+0002+) (297, 0043012650999991945032418+0078+)
|
在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78)
|
在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78])
|
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78)
|
2、编写Map-Reduce程序
编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。
一般遵循以下格式:
1.map: (K1, V1) -> list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
|
2.reduce: (K2, list(V)) -> list(K3, V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
throws IOException;
}
|
对于上面的例子,则实现的mapper如下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(25) == '+') {
airTemperature = Integer.parseInt(line.substring(26, 30));
} else {
airTemperature = Integer.parseInt(line.substring(25, 30));
}
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
|
实现的reducer如下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
|
欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:
1.输入的数据,也即需要处理的数据
2.Map-Reduce程序,也即上面实现的Mapper和Reducer
3.此任务的配置项JobConf
欲配置JobConf,需要大致了解Hadoop运行job的基本原理:
1.Hadoop将Job分成task进行处理,共两种task:map task和reduce task
2.Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker
.JobTracker协调整个job的运行,将task分配到不同的TaskTracker上
.TaskTracker负责运行task,并将结果返回给JobTracker
3.Hadoop将输入数据分成固定大小的块,我们称之input split
4.Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record)
5.Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。
6.Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。
7.在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition
partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}
|
下图大概描述了Map-Reduce的Job运行的基本原理:
下面我们讨论JobConf,其有很多的项可以进行配置:
1.setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
2.setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的数
3.setMapperClass:设置Mapper,默认为IdentityMapper
4.setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
5.setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
6.setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
7.setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
8.setReducerClass:设置Reducer,默认为IdentityReducer
9.setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
10.FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
11.FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在
当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
|
3、Map-Reduce数据流(data flow)
Map-Reduce的处理过程主要涉及以下四个部分:
1.客户端Client:用于提交Map-reduce任务job
2.JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
3.TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
4.HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件
3.1、任务提交
JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。
向JobTracker请求一个新的job ID
检测此job的output配置
计算此job的input splits
将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
通知JobTracker此Job已经可以运行了
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。
3.2、任务初始化
当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job运行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。
其为每个input split创建一个map task。
每个task被分配一个ID。
3.3、任务分配
TaskTracker周期性的向JobTracker发送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。
在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。
TaskTracker有固定数量的位置来运行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。
3.4、任务执行
TaskTracker被分配了一个task,下面便要运行此task。
首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。
TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。
其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。
其三,其创建一个TaskRunner来运行task。
TaskRunner创建一个新的JVM来运行task。
被创建的child JVM和TaskTracker通信来报告运行进度。
3.4.1、Map的过程
MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。
map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,背景线程会将数据按照key在内存中排序。
每次从内存向硬盘flush数据,都生成一个新的spill文件。
当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。
reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。
3.4.2、Reduce的过程
当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。
对于一个job,JobTracker知道TaskTracer和map输出的对应关系。
reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。
reduce task需要其对应的partition的所有的map输出。
reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。
reduce task中有多个copy线程,可以并行拷贝map输出。
当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。
当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。
3.5、任务结束
当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。
当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。
HdFS的基本概念
1.1、数据块(block)
HDFS(Hadoop Distributed File System)默认的最基本的存储单位是64M的数据块。
和普通文件系统相同的是,HDFS中的文件是被分成64M一块的数据块存储的。
不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间。
1.2、元数据节点(Namenode)和数据节点(datanode)
元数据节点用来管理文件系统的命名空间
其将所有的文件和文件夹的元数据保存在一个文件系统树中。
这些信息也会在硬盘上保存成以下文件:命名空间镜像(namespace image)及修改日志(edit log)
其还保存了一个文件包括哪些数据块,分布在哪些数据节点上。然而这些信息并不存储在硬盘上,而是在系统启动的时候从数据节点收集而成的。
数据节点是文件系统中真正存储数据的地方。
客户端(client)或者元数据信息(namenode)可以向数据节点请求写入或者读出数据块。
其周期性的向元数据节点回报其存储的数据块信息。
从元数据节点(secondary namenode)
从元数据节点并不是元数据节点出现问题时候的备用节点,它和元数据节点负责不同的事情。
其主要功能就是周期性将元数据节点的命名空间镜像文件和修改日志合并,以防日志文件过大。这点在下面会相信叙述。
合并过后的命名空间镜像文件也在从元数据节点保存了一份,以防元数据节点失败的时候,可以恢复。
1.2.1、元数据节点文件夹结构
VERSION文件是java properties文件,保存了HDFS的版本号。
layoutVersion是一个负整数,保存了HDFS的持续化在硬盘上的数据结构的格式版本号。
namespaceID是文件系统的唯一标识符,是在文件系统初次格式化时生成的。
cTime此处为0
storageType表示此文件夹中保存的是元数据节点的数据结构。
namespaceID=1232737062 cTime=0 storageType=NAME_NODE layoutVersion=-18
|
1.2.2、文件系统命名空间映像文件及修改日志
1.当文件系统客户端(client)进行写操作时,首先把它记录在修改日志中(edit log)
2.元数据节点在内存中保存了文件系统的元数据信息。在记录了修改日志后,元数据节点则修改内存中的数据结构。
3.每次的写操作成功之前,修改日志都会同步(sync)到文件系统。
4.fsimage文件,也即命名空间映像文件,是内存中的元数据在硬盘上的checkpoint,它是一种序列化的格式,并不能够在硬盘上直接修改。
5.同数据的机制相似,当元数据节点失败时,则最新checkpoint的元数据信息从fsimage加载到内存中,然后逐一重新执行修改日志中的操作。
6.从元数据节点就是用来帮助元数据节点将内存中的元数据信息checkpoint到硬盘上的
7.checkpoint的过程如下:
从元数据节点通知元数据节点生成新的日志文件,以后的日志都写到新的日志文件中。
从元数据节点用http get从元数据节点获得fsimage文件及旧的日志文件。
从元数据节点将fsimage文件加载到内存中,并执行日志文件中的操作,然后生成新的fsimage文件。
从元数据节点奖新的fsimage文件用http post传回元数据节点
元数据节点可以将旧的fsimage文件及旧的日志文件,换为新的fsimage文件和新的日志文件(第一步生成的),然后更新fstime文件,写入此次checkpoint的时间。
这样元数据节点中的fsimage文件保存了最新的checkpoint的元数据信息,日志文件也重新开始,不会变的很大了。
1.2.3、从元数据节点的目录结构
1.2.4、数据节点的目录结构
数据节点的VERSION文件格式如下:
namespaceID=1232737062
storageID=DS-1640411682-127.0.1.1-50010-1254997319480
cTime=0
storageType=DATA_NODE
layoutVersion=-18
|
blk_<id>保存的是HDFS的数据块,其中保存了具体的二进制数据。
blk_<id>.meta保存的是数据块的属性信息:版本信息,类型信息,和checksum
当一个目录中的数据块到达一定数量的时候,则创建子文件夹来保存数据块及数据块属性信息。
二、数据流(data flow)
2.1、读文件的过程
客户端(client)用FileSystem的open()函数打开文件
DistributedFileSystem用RPC调用元数据节点,得到文件的数据块信息。
对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。
DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据。
客户端调用stream的read()函数开始读取数据。
DFSInputStream连接保存此文件第一个数据块的最近的数据节点。
Data从数据节点读到客户端(client)
当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。
当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。
在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。
失败的数据节点将被记录,以后不再连接。
2.2、写文件的过程
客户端调用create()来创建文件
DistributedFileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件。
元数据节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。
DistributedFileSystem返回DFSOutputStream,客户端用于写数据。
客户端开始写入数据,DFSOutputStream将数据分成块,写入data queue。
Data queue由Data Streamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块)。分配的数据节点放在一个pipeline里。
Data Streamer将数据块写入pipeline中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。
DFSOutputStream为发出去的数据块保存了ack queue,等待pipeline中的数据节点告知数据已经写入成功。
如果数据节点在写入的过程中失败:
关闭pipeline,将ack queue中的数据块放入data queue的开始。
当前的数据块在已经写入的数据节点中被元数据节点赋予新的标示,则错误节点重启后能够察觉其数据块是过时的,会被删除。
失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点。
元数据节点则被通知此数据块是复制块数不足,将来会再创建第三份备份。
当客户端结束写入数据,则调用stream的close函数。此操作将所有的数据块写入pipeline中的数据节点,并等待ack queue返回成功。最后通知元数据节点写入完毕。
如果你想深入理解hadoop平台的话我觉得研究其源码是少不了的,Hadoop源码网址。再结合这本书Hadoop源代码分析(完整版)去看。
有时间的话我也会仔细去研究,不过现在还是先补基础,基础好了看这些东西要来的些。这里就简单概述一下Hadoop平台的东西吧。
本文作者:吴斌炜 来源:CIOZJ
CIO之家 www.ciozj.com 微信公众号:imciow