背景

业务里面由于涉及到的数据量比较大,所以很多任务不可避免的必须要用分布式集群来处理,由于工程方面都是用的C++,逻辑都是现成的,所以用Hadoop Streaming比较合适,通过这篇文章学习一下Hadoop Streaming。

什么是Hadoop Streaming

Hadoop Streaming是一个Hadoop官方提供的工具,可以让我们用非Java语言来实现mapreduce程序,这个语言可以是任何一种语言,为什么这么说呢,因为Hadoop Streaming最终实现的mapreduce程序,是通过Unix标准流作为Hadoop和mapreduce程序之间的通信接口,所以我们可以使用任何编程语言通过标准I/O来构建mapreduce程序。

而且,Hadoop Streaming的mapreduce程序,不仅仅是程序,脚本也是可以的,因为是通过标准I/O和Hadoop进行通信,所以,mapreduce这边的话,只要能够处理流就可以。

我们来看一个最简单的wordcount实例。

hadoop_streaming.sh 文件如下

HADOOP=/home/users/chenguodong02/hadoop-client/hadoop/bin/hadoop

INPUT=/user/ccdb/rd/chenguodong02/test/input
OUTPUT=/user/ccdb/rd/chenguodong02/test/output

${HADOOP} fs -rmr ${OUTPUT}

${HADOOP} streaming \
    -jobconf mapred.job.name="Test" \
    -jobconf mapred.job.priority="VERY_HIGH" \
    -jobconf mapred.reduce.tasks="10" \
    -input ${INPUT} \
    -output ${OUTPUT} \
    -file 'red.sh' \
    -mapper 'cat' \
    -reducer 'red.sh'

red.sh 文件如下

awk '
BEGIN{
    pre_word=""
    pre_sum=1
}
{
    cur_word=$0
    cur_sum=1
    if (pre_word==cur_word) {
        cur_sum=pre_sum+1
    } else if (pre_word!="") {
        printf "%s\t%d\n", pre_word, pre_sum
    }
    pre_word=cur_word
    pre_sum=cur_sum
}
END{
    if (pre_word!="") {
        printf "%s\t%d\n", pre_word, pre_sum
    }
}
'

输出如下:
xiaoli 2
lisi 1
xiaoming 4

通过执行hadoop_streaming.sh这个脚本,我们就能启动一次hadoop任务,我们通过-mapper参数指定cat为我们的map策略,通过-reducer参数指定red.sh脚本为我们的reduce策略,这样就完成了一次单词计数的任务,是不是非常简洁明了。

hadoop streaming程序构建形式

通过上面的定义以及demo,我们对hadoop streaming程序有了一个初步的认识,下面,我们来看一下hadoop streaming程序的构建。

因为hadoop系统是由java实现的,所以,我们可以想到hadoop streaming肯定也是内嵌于hadoop系统的一个java框架,我们通过下面这条命令,就能够启动一个hadoop streaming任务。

hadoop jar hadoop-streaming.jar [参数]

hadoop-streaming.jar就是官方提供给我们的hadoop-streaming框架的具体实现。我们只需要提供后面的参数,就可以完整的执行一次hadoop作业。

参数的话,我们看下一个小节。

hadoop streaming参数

在最开始的demo中,我们已经使用了几个最常用参数,比如 -file, -mapper, -reducer等,我们就是通过这种参数的形式,来提供给hadoop-streaming框架必要的作业信息,除了上面的参数,通常常用的还有 -mapred.map.tasks,-mapred.reduce.tasks,-mapred.job.priority等参数,更多的参数,我们可以通过hadoop-default.xml文件来查看。

InputFormat & InputSplits & RecordReader

我们上面已经知道如何构建hadoop streaming程序,以及如何发起一个hadoop streaming任务。下面,我们来看三个比较重要的概念。
这三个概念的话,我们首先需要看InputFormat,我们看一下官方给的定义:

InputFormat describes the input-specification for a Map-Reduce job.
The Map-Reduce framework relies on the InputFormat of the job to:
1. Validate the input-specification of the job.
2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
3. Provide the RecordReader implementation to be used to glean input records from the logical InputSplit for processing by the Mapper.

这里面还包括两个东西,一个是InputSplits,另一个是RecordReader,我们分别看一下官方给出的定义:

InputSplits:
InputSplit represents the data to be processed by an individual Mapper.
Typically, it presents a byte-oriented view on the input and is the responsibility of RecordReader of the job to process this and present a record-oriented view.

RecordReader:
The record reader breaks the data into key/value pairs for input to the Mapper.

我们先来说一下InputSplits这个概念。
我们知道,HDFS中数据是按照block来存储的,默认block是64MB,当我们把一个大文件导入hdfs中的时候,文件会按 64MB 每个block来分割(如果按默认配置)。 如果你有1GB的文件要存入HDFS中, 1GB/64MB = 1024MB / 64MB = 16 个blocks 会被分割到不同的datanode上。
上面提到的InputSplit,就是用来数据分割的,数据分割(data splitting )策略是基于文件偏移进行的。文件分割的目的是 有利于数据并行处理 ,以及 便于数据容灾恢复。

我们来看一下hadoop数据分割是怎样执行的。
我们给定split.size参数之后,可以控制 M/R 中 Mapper 的数量。如果没有给定 split.size , 就用默认的HDFS配置作为 input split。

举个例子:
你有个100MB的文件,block size 是64MB,那么就会被split成2块。这时如果你你没有指定input split,你的M/R程序就会按2个input split来处理,并分配两个mapper来完成这个job。
但如果你把split size指定为100MB,那么M/R程序就会把数据处理成一个split,这时只用分配一个mapper就可以了。
但如果你把split size指定为25MB,M/R就会将数据分成4个split,分配4个mapper来处理这个job。

InputSplits要点:
1. block是物理上的数据分割,而split是逻辑上的分割。
2. 如果没有特别指定,split size 就等于 HDFS 的 block size 。
3. 用户可以在M/R 程序中自定义split size。
4. 一个split 可以包含多个blocks,也可以把一个block应用多个split操作。
5. 有多少个split,就有多少个mapper。

我们再来解释一下RecordReader,这个东西其实可以理解为一个record的迭代器,它从InputSplits处得到字节类型的数据,然后经过它的处理,我们可以获得键值对类型的记录。

而InputFormat,我们看上面最开始官方给的定义,其实InputFormat可以看作是整个包括InputSplits和RecordReader过程的一个管理者,它首先检查输入规范,然后叫来InputSplits进行逻辑分割,然后叫来RecordReader生成键值对记录,然后就能开始map作业了。

而关于InputFormat,是分为很多种的,我们来看最常见的几个:

TextInputFormat:
TextInputFormat是默认的INputFormat,输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容。如果一个作业的Inputformat是TextInputFormat,并且框架检测到输入文件的后缀是 .gz 和 .lzo,就会使用对应的CompressionCodec自动解压缩这些文件。但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给一个mapper来处理。

KeyValueTextInputFormat
输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为(\t)字符。注意:kv记录在传给map之前,key会被丢掉。

NLineInputFormat
与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1。

SequenceFileInputFormat
一个用来读取字符流数据的InputFormat,为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。

我们在处理不同类型的数据的时候,需要根据具体情况来选择恰当的InputFormat来优化我们的任务。

注意:
我们上面谈到的一切定义,不同版本甚至不同社区的实现,都会有些许不同,但是大致意思就是这样,我们在具体使用的时候,还要看具体版本的定义。

hadoop streaming mapper/reducer与Map/Reduce框架之间的通信协议

通过上面,我们已经知道了数据是怎样进入mapreduce任务的了,现在,我们再来看一下hadoop streaming mapreduce任务和核心的Mapreduce框架之间的通信协议。

mapper初始化:
mapper程序初始化时,每一个mapper任务会把可执行文件或者脚本作为一个单独的进程启动。

mapper输入:
mapper任务运行时,它把输入切分成行,并把每一行作为标准输入提供给可执行文件或脚本。

mapper输出:
mapper任务收集可执行文件的标准输出,并切分为key/value对,作为mapper的输出。默认情况下,一行中第一个tag之前的部分作为key,之后的(不包括tag)内容作为value。如果没有tag,默认整行作为key值,value值为NULL。

reducer初始化:
同mapper。

reducer输入:
把mapper输出的key/value对作为输入,key和value之间默认由tag分隔。

reducer输出:
reducer收集可执行文件的标准输出,并切分为key/value对,默认情况下,一行中第一个tag之前部分作为key,之后的作为value。

上面就是mapreduce任务和hadoop MapReduce框架之间基本的通信协议,当然,我们可以根据需求去自定义key和value的切分方式,具体看下面。

自定义key/value切分

之前已经提到,当Map/Reduce框架从mapper的标准输入读取一行时,会把这一行切分为key/value对,默认分隔符为tab,tab之前的作为key,tab之后的作为value,除此之外,我们可以自定义分隔符以及指定第几个符号为分隔符,就是用下面的两个参数来定义。
-jobconf stream.map.output.field.separator=.
-jobconf stream.num.map.output.key.fields=4

第一个参数可以自定义分隔符,第二个参数用来定义第几个符号是分隔符,上面的就是用.作为分隔符,并且第四个.之前的作为key,之后的作为value,如果.不够四个,那么整行作为key,value为NULL。

缓存文件

我们最开始的demo任务中,我们使用-file参数来上传我们的red.sh脚本,这个red.sh脚本很小,所以很快就完成了上次,但是如果我们需要在mapreduce策略中使用一个很大的文件,或者是一个事先已经在HDFS上的文件,那么我们应该怎么办。
hadoop streaming提供给我们-cacheFile和-cacheArchive选项在集群中分发文件和档案,选项的参数是用户已经上传到HDFS的文件或者档案的地址。-cacheArchive不同于-cacheFile的一点是可以自动解压压缩文件。

注意:这些文件或者档案在不同的作业间缓存,并且以符号链接的形式在当前工作目录下建立引用。
我们来看一下具体使用:
-cacheFile “/user/input.txt#input”,#号之后的input就是建立在我们工作目录下的符合链接。
-cacheArchive “/user/input.tgz#input”,同理,#号之后的input就是解压input.tgz后,建立在我们工作目录下的符合链接。

高级参数

(未完待续)

原创文章,转载请注明地址: 文章地址