名词解释 Hadoop: Hadoop是一个框架,它是由Java语言来实现的。Hadoop是处理大数据技术.Hadoop可以处理云计算产生大数据。
CDH商业版:
Cloudera CDH是Hadoop的一个版本,比Apache Hadoop的优点如下:
CDH基于稳定版Apache Hadoop,并应用了最新Bug修复或者Feature的Patch。Cloudera常年坚持季度发行Update版本,年度发行Release版本,更新速度比Apache官方快,而且在实际使用过程中CDH表现无比稳定,并没有引入新的问题。
Cloudera官方网站上安装、升级文档详细,省去Google时间。
CDH支持Yum/Apt包,Tar包,RPM包,Cloudera Manager四种方式安装,总有一款适合您。官方网站推荐Yum/Apt方式安装,其好处如下:
联网安装、升级,非常方便。当然你也可以下载rpm包到本地,使用Local Yum方式安装。
自动下载依赖软件包,比如要安装Hive,则会级联下载、安装Hadoop。
Hadoop生态系统包自动匹配,不需要你寻找与当前Hadoop匹配的Hbase,Flume,Hive等软件,Yum/Apt会根据当前安装Hadoop版本自动寻找匹配版本的软件包,并保证兼容性。
自动创建相关目录并软链到合适的地方(如conf和logs等目录);自动创建hdfs, mapred用户,hdfs用户是HDFS的最高权限用户,mapred用户则负责mapreduce执行过程中相关目录的权限。
大数据的4个V:
Velocity:实现快速的数据流传
Variety: 具有多样的数据类型
Volume: 存有海量的数据规模(TB,PB,EB级别)
Value:存在着巨大的价值
MapReduce
Hadoop实际上就是谷歌三宝的开源实现,
Hadoop MapReduce对应Google MapReduce,
HBase对应BigTable,
HDFS对应GFS。HDFS(或GFS)为上层提供高效的非结构化存储服务,
HBase(或BigTable)是提供结构化数据服务的分布式数据库,Hadoop MapReduce(或Google MapReduce)是一种并行计算的编程模型,用于作业调度。
HBase HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。
HDFS(Hadoop Distributed File System):
默认的最基本的存储单位是64M的数据块。
和普通文件系统相同的是,HDFS中的文件是被分成64M一块的数据块存储的。
不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间。
hive hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。
联机事务处理OLTP(On-line Transaction Processing)、联机分析处理OLAP(On-Line Analytical Processing) OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
分析型数据不允许update、delete操作
Sqoop Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
ZooKepper ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。 ZooKeeper包含一个简单的原语集,提供Java和C的接口。
Mahout
Mahout 是 Apache Software Foundation(ASF) 旗下的一个开源项目,提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序。Mahout包含许多实现,包括聚类、分类、推荐过滤、频繁子项挖掘。此外,通过使用 Apache Hadoop 库,Mahout 可以有效地扩展到云中。
安装Hadoop 支持平台
GNU/Linux是产品开发和运行的平台。Hadoop已在有2000个节点的GNU/Linux主机组成的集群系统上得到验证。
Win32平台是作为开发平台
支持的。由于分布式操作尚未在Win32平台上充分测试,所以还不作为一个生产平台
被支持。
步骤: 安装 VMware 安装 Ubuntu 安装 jdk 解压tar -vzfx jdk-1.7.0.tar.gz
配环境变量sudo vim /etc/profile
1 2 3 export JAVA_HOME=/home/master0/Desktop/jkd1.7.0_80export CLASSPATH=.:$JAVA_HOME /lib/dt.jar:$JAVA_HOME /lib/tools.jarexport PATH=$JAVA_HOME /bin:$PATH
使配置文件生效:source /etc/profile
安装Hadoop 配环境变量sudo vim /etc/profile
1 2 3 4 5 export JAVA_HOME=/home/master0/Desktop/jdk1.7.0_80export HADOOP_HOME=/home/master0/Desktop/hadoop-2.6.0export CLASSPATH=.:$JAVA_HOME /lib/dt.jar:$JAVA_HOME /lib/tools.jar:$HADOOP_HOME /share/hadoop/common/hadoop-common-2.6.0.jar:$HADOOP_HOME /share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:$HADOOP_HOME /share/hadoop/common/lib/commons-1.2.jar:$CLASSPATH export PATH=$JAVA_HOME /bin:$PATH export PATH=$HADOOP_HOME /bin:$PATH
配置~/hadoop-2.6.0/etc/hadoop/hadoop-env.sh
1 2 export JAVA_HOME=/home/master0/Desktop/jdk1.7.0_80
测试hadoop
使用hadoop的本地单独模式 对某目录下的文档进行单词数的统计
1 2 3 4 5 $ cd /home/hadoop/ $ mkdir input $ cp $HADOOP_HOME /etc/hadoop/*.xml input/ $ hadoop jar $HADOOP_HOME /share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep input output 'dfs[a-z.]+' $ cat output/*
克隆虚拟机 修改主机名
1 sudo gedit /etc/hostname
配置静态IP 1 sudo gedit /etc/network/interfaces
编辑->虚拟网络编辑器->查看NAT模式的子网地址
例如为:231
master
1 2 3 4 5 6 7 8 9 10 auto eth0 iface eth0 inet static address 192.168.231.129 netmask 255.255.255.0 network 192.168.231.0 boardcast 192.168.231.255 gateway 192.168.231.2 dns-nameservers 192.168.1.1 8.8.8.8 8.8.8.4 ping 192.168.231.130
serve1
1 2 3 4 5 6 7 8 9 10 auto eth0 iface eth0 inet static address 192.168.231.130 netmask 255.255.255.0 network 192.168.231.0 boardcast 192.168.231.255 gateway 192.168.231.2 dns-nameservers 192.168.1.1 8.8.8.8 8.8.8.4 ping 192.168.231.129
若访问不了网页的话可以将物理机的dns填写在dns-nameservers第一个
若拖文件拖不进虚拟机需检查:
虚拟机ping与其对应的模式的虚拟网卡可不可以ping通
主机ping与虚拟机可不可以ping通
VMware Network Adapter VMnet1:桥接模式虚拟网卡
VMware Network Adapter VMnet8:NAT模式虚拟网卡
修改hosts文件 1 2 3 4 5 sudo gedit /etc/hosts 192.168.231.129 master 192.168.231.130 serve1 192.168.231.131 serve2
安装ssh 1 sudo apt-get install ssh
安装完毕就会出现/home/master/.ssh
文件夹
然后需要生成了一个公钥
1 2 3 4 ssh-keygen -t rsa -P '' 会生成 id_rsa id_rsa.pub known_hosts
id_rsa为私钥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAp6JLYxM9lm/ciNG5SuAd/0WBBY0VN98w1KLad5GrkZhM5iZ1 mKnl1JHhT14//QSqtJ/tAAo8P1EZspvziS1q77DVBF4L/kInl0KEZOiFWMUOKqDj y+TWLSZmBK9uP5J2cb2wnIMZ4HeWw0y8hnaCpfg4FNVm8WL/EQh++EHx4VBQv0bt 4s3qZ9LgYM0MGDrizYKCZ92vRE2CVgLlpAzXvD2uFfxlFwJl02l35fjaIW2ed6PV HrnT8D6BrpUIdKWzWsevj3W6IfO0upBtqOygJw0RxYSx646nDDdFXIk7bzdVFXdr sOjOblsPGqTJs+aApEQB3avOUZI0EixCr2h7/wIDAQABAoIBAQCY1je1lQ1J46NG ezBdPAkdfNktnnwB/NQginp1GbM7g4hZLid5kS2iqX6rRltA7MhW9pi2uJ5FfEPZ vKZGI8qjzq3o1XZJ0zcVief7uKQbU06fPyFx/KnpcGEDVI9IFtk2yqQDjuRA68fh OE2KqvJjL/Sxyf+ZhZDYjs50ums16PHxXlhAaP8EI78Dcff5sx+ZoKTVGum4Jrdl h0cXeDBcxJZg7wEtHPEUrduaiwEv88fD7aw2QwsYdCuPECncltR57iPi95hr7uaW XdtRZ+mAey5sBxJmZKrlPE6kK3yAvSs1tP0yz4R4azAYQqTpLmxcfMrqWRwb3IMA 9Rl98FIBAoGBANpNaYJgaTvDT2Nski6sTu1oPefow4tosvPE1jZ/gWXExJ9m1OiI GcZGG0nM+UCx85+//B6gyLdvmUGgxN9vzmY3myuhQ9iesep7W+DiqDnz2J/VRM98 eEso6P1jevlC90WJh1wNrVIuzxuN/5A5LghjNNuHCnZzJTRuSKjISjRhAoGBAMSU 9hdNDliOXDIIRs/vjwiRuLvbECMFqETSyFdnc91dAi2cYfwlfKFlWGSPFO/LuTvL 9PfWaKgfuAzMiZ5JoMPlo5iX8atX1V4Naz7e3OBR9rhyD0oO4aNyKtvDv7tIWTxm eWw/4hlmPp/wGYgfxlPOrbVfJcESYk9FmRxxeoxfAoGAP87ozCcKG2HXTqRphiLv Xw1dKvAqWBFeXUpnor5aQDjnkAAqs100y3OqfkPfhz18jHE9bGZqxNNl5HztjrHL jq0qOfKFNkgMkRFFpdIagfX4l59q4YrsTmvCzm3JgBpG1JiCbDHDO4ZbGx7CWJGe Fu2IgbJTKJQ3h7/ElTEWH4ECgYEAoxOr/vJ2hzI5+2twSwlBT+uLI5P8FAGacNWn SxLQRH/m0a2cf48dj8pCBNHJnZAUby2oX30nvujpRvza4UvVKQ20pF7QJcMshuR8 5l/9Pb3g/WvpkRc9SdjpAvylbpj7JicgbZOlXkq6gvWsSIeLgHTBF+gBquQ0V+y1 sqnU7uMCgYBMSR2QDG5TuSp7pNOFBFuqhOCrUHZmKqoHCZ7rSh3etxc8D5tLXciE APNWfGqSE2aT/PJgqNoxl5p42bnZrv3cXJuiD9Wid6yFzDH0oUa9K66vy1SWV+B8 3rHha5wLzizgNUQZjh1XSndp1WekYCLjV+Bn8b/odBClcHKX7M/BOg== -----END RSA PRIVATE KEY-----
id_rsa.pub为公钥
1 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCnoktjEz2Wb9yI0blK4B3/RYEFjRU33zDUotp3kauRmEzmJnWYqeXUkeFPXj/9BKq0n+0ACjw/URmym/OJLWrvsNUEXgv+QieXQoRk6IVYxQ4qoOPL5NYtJmYEr24/knZxvbCcgxngd5bDTLyGdoKl+DgU1WbxYv8RCH74QfHhUFC/Ru3izepn0uBgzQwYOuLNgoJn3a9ETYJWAuWkDNe8Pa4V/GUXAmXTaXfl+NohbZ53o9UeudPwPoGulQh0pbNax6+Pdboh87S6kG2o7KAnDRHFhLHrjqcMN0VciTtvN1UVd2uw6M5uWw8apMmz5oCkRAHdq85RkjQSLEKvaHv/ master@ubuntu
要想免密登录则需要被免密登录方的公钥:这里可以先将各台分机的公钥发送给主机master,然后再由master合成一个文件再发送给分机。这样每台机器都会有其它所有机器的公钥
生成公钥文件
1 cat .ssh/id_rsa.pub >> .ssh/authorized_keys
1 2 authorized_keys如下,其实和公钥相同: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCnoktjEz2Wb9yI0blK4B3/RYEFjRU33zDUotp3kauRmEzmJnWYqeXUkeFPXj/9BKq0n+0ACjw/URmym/OJLWrvsNUEXgv+QieXQoRk6IVYxQ4qoOPL5NYtJmYEr24/knZxvbCcgxngd5bDTLyGdoKl+DgU1WbxYv8RCH74QfHhUFC/Ru3izepn0uBgzQwYOuLNgoJn3a9ETYJWAuWkDNe8Pa4V/GUXAmXTaXfl+NohbZ53o9UeudPwPoGulQh0pbNax6+Pdboh87S6kG2o7KAnDRHFhLHrjqcMN0VciTtvN1UVd2uw6M5uWw8apMmz5oCkRAHdq85RkjQSLEKvaHv/ master@ubuntu
1 2 3 4 分机serve1复制公钥到master主机上: scp .ssh/id_rsa.pub master@master:/home/master/id_rsa_1.pub 将分机serve1的公钥追加到主机的authorized_keys上cat id_rsa_1.pub >> .ssh/authorized_keys
重复以上两步直到主机master的authorized_keys有所有分机的公钥,再进行分发操作
1 2 scp .ssh/authorized_keys master@serve1:/home/master/.ssh/authorized_keys scp .ssh/authorized_keys master@serve2:/home/master/.ssh/authorized_keys
分发完毕后即可进行测试:
1 2 3 ssh master ssh serve1 能连接成功即可
SSH免密码设置失败解决
权限问题
.ssh目录,以及/home/当前用户 需要700权限,参考以下操作调整
1 2 3 $sudo chmod 777 ~/.ssh$sudo chmod 700 /home/当前用户
.ssh目录下的authorized_keys文件需要600或644权限,参考以下操作调整
1 $sudo chmod 644 ~/.ssh/authorized_keys
StrictModes问题
1 $sudo gedit /etc/ssh/sshd_config
找到
改成
如果还不行,可以用ssh -vvv 目标机器ip
查看详情
配置Hadoop集群 以下将会修改多个Hadoop配置文件均位于hadoop-2.6.0/etc
目录下
修改:hadoop-env.sh
、yarn-env.sh
1 2 3 4 gedit etc/hadoop/hadoop-env.shexport JAVA_HOME=/home/master/jdk1.7.0_80
1 gedit etc/hadoop/yarn-env.sh
core-site.xml
core-site.xml的完整参数请参考: http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/core-default.xml
1 gedit etc/hadoop/core-site.xml
/home/hadoop/tmp
目录如不存在,则先mkdir手动创建
1 2 3 4 5 6 7 8 9 10 11 12 <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://master:9000</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /home/master/tmp</value > </property > </configuration >
hdfs-site.xml
hdfs-site.xml的完整参数请参考: http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
1 gedit etc/hadoop/hdfs-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <configuration > <property > <name > dfs.datanode.ipc.address</name > <value > 0.0.0.0:50020</value > </property > <property > <name > dfs.datanode.http.address</name > <value > 0.0.0.0:50075</value > </property > <property > <name > dfs.namenode.name.dir</name > <value > file:/home/master/data/namenode</value > </property > <property > <name > dfs.datanode.data.dir</name > <value > file:/home/master/data/datanode</value > </property > <property > <name > dfs.namenode.secondary.http-address</name > <value > slave1:9001</value > </property > <property > <name > dfs.replication</name > <value > 1</value > </property > <property > <name > dfs.permissions</name > <value > false</value > </property > </configuration >
配置slaves分机列表
分发配置文件到集群的其它机器
1 scp -r hadoop-2.6.0/etc/hadoop/ master@serve1:/home/master/hadoop-2.6.0/etc/
格式化hdfs
等看到执行信息有has been successfully formatted表示格式化ok
启动 dfs
1 hadoop-2.6.0/sbin/start-dfs.sh
验证hadoop是否启动成功:
1 2 3 4 $jps 显示有: 4895 DataNode 4775 NameNode
安装 MapReduce mapred-site.xml的完整参数请参考http://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
将mapred-site.xml.template改名成mapred-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > <property > <name > mapreduce.jobhistory.address</name > <value > master:10020</value > </property > <property > <name > mapreduce.jobhistory.webapp.address</name > <value > master:19888</value > </property > </configuration >
yarn-site.xml
yarn-site.xml的完整参数请参考: http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.nodemanager.aux-services.mapreduce.shuffle.class</name > <value > org.apache.hadoop.mapred.ShuffleHandler</value > </property > <property > <name > yarn.resourcemanager.scheduler.address</name > <value > master:8030</value > </property > <property > <name > yarn.resourcemanager.resource-tracker.address</name > <value > master:8025</value > </property > <property > <name > yarn.resourcemanager.address</name > <value > master:8040</value > </property > </configuration >
启动yarn 1 hadoop-2.6.0/sbin/start-yarn.sh
1 2 3 4 5 $jps 多了ResourceManager和NodeManager表示启动yarn成功 SecondaryNameNode ResourceManager NameNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 hadoop-2.6.0/sbin/start-dfs.sh hadoop-2.6.0/sbin/start-yarn.sh jps master节点上有几下3个进程: 7482 ResourceManager 7335 SecondaryNameNode 7159 NameNode slave1、slave2上有几下2个进程: 2296 DataNode 2398 NodeManager hadoop-2.6.0/sbin/stop-dfs.sh hadoop-2.6.0/sbin/stop-yarn.sh
或打开浏览器访问:hdfs管理界面: http://master:50070
yarn的管理界面: http://master:8088/
查看hadoop状态
1 2 3 4 5 6 7 hdfs dfsadmin -report //查看hdfs的状态报告 yarn node -list //查看yarn的基本信息 Secondary// TODO NameNode 元数据 DataNode 数据块
HDFS文件系统 hadoop实现了一个分布式文件系统HDFS(Hadoop Distributed File System)
元数据:用于描述数据的数据。
NameNode 主服务器,用来管理整个文件系统的命名空间和元数据,以及处理来自外界的文件访问请求。整个集群中只有一个。含有:
命名空间:整个分布式文件系统的目录结构
数据块与文件名的映射表
每个数据块副本的位置信息(每个数据块默认3个副本)
元数据保存在NameNode的内存当中(1G内存可存放1000000个块对应的元数据信息,缺省每块64M计算可对应64T实际数据)
DataNode通过心跳包(Heartbeats)与NameNode通讯
HA(High Available)高可用
DataNode 用来实际存储和管理文件的数据块
数据块-64M(128M)数据块+备份公用一个ID
主从架构:1个NameNode对应n个DataNode
1 2 3 4 client-java app -> data NameNode(客户端向NameNode发起请求) client-sid datanode-> datanode -> r/w -> dfs file(NameNode返回对应的DataNode给客户端让客户端来通过DataNode进行访问) -> namenode(向NameNode汇报情况)
JVM从HDFS读取文件流程
client会从距离最近的机子上读取
HDFS文件存储的组织与读写: 数据写入
客户端调用FileSystem 实例的create 方法,创建文件。NameNode 通过一些检查,比如文件是否存在,客户端是否拥有创建权限等;通过检查之后,在NameNode 添加文件信息。注意,因为此时文件没有数据,所以NameNode 上也没有文件数据块的信息。
创建结束之后, HDFS 会返回一个输出流DFSDataOutputStream 给客户端。
客户端调用输出流DFSDataOutputStream 的write 方法向HDFS 中对应的文件写入数据。
数据首先会被分包,这些分包会写人一个输出流的内部队列Data 队列中,接收完数据分包,输出流DFSDataOutputStream 会向NameNode 申请保存文件和副本数据块的若干个DataNode , 这若干个DataNode 会形成一个数据传输管道。DFSDataOutputStream 将数据传输给距离上最短的DataNode ,这个DataNode 接收到数据包之后会传给下一个DataNode 。数据在各DataNode之间通过管道流动,而不是全部由输出流分发,以减少传输开销。
因为各DataNode 位于不同机器上,数据需要通过网络发送,所以,为了保证所有DataNode 的数据都是准确的,接收到数据的DataNode 要向发送者发送确认包(ACK Packet ) 。对于某个数据块,只有当DFSDataOutputStream 收到了所有DataNode 的正确ACK. 才能确认传输结束。DFSDataOutputStream 内部专门维护了一个等待ACK 队列,这一队列保存已经进入管道传输数据、但是并未被完全确认的数据包。
不断执行第3 - 5 步直到数据全部写完,客户端调用close 关闭文件。
DFSDataInputStream 继续等待直到所有数据写人完毕并被确认,调用complete 方法通知NameNode 文件写入完成。NameNode 接收到complete 消息之后,等待相应数量的副本写入完毕后,告知客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 查看文件 hadoop fs -cat /output/part-00000 查看hadoop文件系统 hadoop fs -ls / hadoop fs -ls -R /output hadoop fs -ls /output 创建文件夹 hadoop fs -mkdir /tmp hadoop fs -mkdir /input hadoop fs -mkdir /output 将文件放到hadoop文件系统-put 当前路径 /home/master/input 放到的路径 hadoop fs -put /home/master/input/* /input hadoop fs -get /output output hadoop fs -rm -R /input hadoop fs -rm -r /output/output hadoop fs -mv /output/output/part-r-00000 /output/part-r-00000
运行例子
1 hadoop jar hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep /input /output 'dfs[a-z.]+'
Hadoop IO HDFS数据完整性
校验和+后台进程
文件数据结构-解决大量小文件
SequenceFile:用流来读写
MapFile
MapReduce Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论文公开发布了。其中对它的定义是,Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所有这些中间的有着相同key的values合并起来。很多现实世界中的任务都可用这个模型来表达。
结构
Mapper、Reduce
运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner类,它实际也是Reducer的实现。
JobTracker、TaskTracker
它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker部署在单独的机器上。
JobClient
每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。
JobInProgress
JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。
TaskInProgress
JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。
MapTask、ReduceTask
一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。这个过程在下一部分再详细介绍。
案例 单词统计案例 1 2 3 4 5 6 7 8 9 10 11 Mapper<LongWritable, Text, Text, IntWritable> public void map(LongWritable k1, Text v1, Context context) 输入LongWritable k1, Text v1(LongWritable, Text):序号,行 处理:从行中split出每个单词,并将每个单词的值设为1 输出Context context(Text, IntWritable):单词,所有该单词的值的集合(数组) Reducer<Text, IntWritable, Text, IntWritable> public void reduce(Text key, Iterable<IntWritable> values, Context context) 输入Text key, Iterable<IntWritable> values(Text, IntWritable):单词,所有该单词的值的集合(数组) 处理:使用迭代器Iterator来迭代每个单词的值的数组并将数组中的每个元素相加,和作为该单词新的值 输出Context context(Text, IntWritable):单词,单词出现次数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package mypro1;import java.io.IOException;import java.net.URI;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyWordCount { static class MyMapper extends Mapper <LongWritable, Text, Text, IntWritable>{ public void map (LongWritable k1, Text v1, Context context) throws java.io.IOException, java.lang.InterruptedException{ String[] lines= v1.toString().split("\\s+" ); for (String word: lines){ context.write(new Text (word), new IntWritable (1 )); } System.out.println("map......" ); } } static class MyReduce extends Reducer <Text, IntWritable, Text, IntWritable>{ public void reduce (Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException, java.lang.InterruptedException{ int sum=0 ; Iterator<IntWritable> it = values.iterator(); while (it.hasNext()){ sum+= it.next().get(); } context.write(key, new IntWritable (sum)); System.out.println("reduce......" ); } } private static String INPUT_PATH="hdfs://master:9000/input/hdfs-site.xml" ; private static String OUTPUT_PATH="hdfs://master:9000/output/c/" ; public static void main (String[] args) throws Exception { Configuration conf=new Configuration (); FileSystem fs=FileSystem.get(new URI (OUTPUT_PATH),conf); if (fs.exists(new Path (OUTPUT_PATH))) fs.delete(new Path (OUTPUT_PATH)); Job job = new Job (conf,"myjob" ); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path (INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path (OUTPUT_PATH)); job.waitForCompletion(true ); } }
排序案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 7 5 2 1 2 2 9 3 1 8 4 5 6 2 0 7 Mapper<LongWritable, Text, MyK2, LongWritable> public void map(LongWritable k1, Text v1, Context context) 输入LongWritable k1, Text v1(LongWritable, Text):序号,行 处理 输出Context context(MyK2, LongWritable):两个数,后面那个数(与排序无关,为空都可以) Reducer<MyK2, LongWritable,LongWritable, LongWritable> public void reduce(MyK2 myk2, Iterable<LongWritable> v2s,Context context) 输入MyK2 myk2, Iterable<LongWritable> v2s(MyK2, LongWritable):两个数,后面那个数(与排序无关,为空都可以) 处理 输出Context context(LongWritable, LongWritable):第一个数,第二个数 0 7 1 8 2 1 2 2 4 5 6 2 7 5 9 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package demo;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Sort { static class MyK2 implements WritableComparable <MyK2>{ public long myk2; public long myv2; public MyK2 () { } public MyK2 (long myk2, long myv2) { this .myk2 = myk2; this .myv2 = myv2; } @Override public void readFields (DataInput in) throws IOException { this .myk2=in.readLong(); this .myv2=in.readLong(); } @Override public void write (DataOutput out) throws IOException { out.writeLong(myk2); out.writeLong(myv2); } @Override public int compareTo (MyK2 my) { long temp=this .myk2-my.myk2; if (temp!=0 ){ return (int ) temp; } return (int ) (this .myv2-my.myv2); } } static class MyMapper extends Mapper <LongWritable, Text, MyK2, LongWritable>{ public void map (LongWritable k1, Text v1, Context context) throws java.io.IOException, java.lang.InterruptedException { String[] lines= v1.toString().split("\\s" ); MyK2 myK2 = new MyK2 (Long.parseLong(lines[0 ]), Long.parseLong(lines[1 ])); context.write(myK2, new LongWritable (Long.parseLong(lines[0 ]))); System.out.println("map......" ); } } static class MyReduce extends Reducer <MyK2, LongWritable,LongWritable, LongWritable>{ public void reduce (MyK2 myk2, Iterable<LongWritable> v2s,Context context) throws java.io.IOException, java.lang.InterruptedException { context.write(new LongWritable (myk2.myk2), new LongWritable (myk2.myv2)); System.out.println("reduce......" ); } } private static String INPUT_PATH="hdfs://master:9000/input/num" ; private static String OUTPUT_PATH="hdfs://master:9000/output/num/" ; public static void main (String[] args) throws Exception { Configuration conf=new Configuration (); FileSystem fs=FileSystem.get(new URI (OUTPUT_PATH),conf); if (fs.exists(new Path (OUTPUT_PATH))) fs.delete(new Path (OUTPUT_PATH)); Job job=new Job (conf,"myjob" ); job.setJarByClass(Sort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(MyK2.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job,new Path (INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path (OUTPUT_PATH)); job.waitForCompletion(true ); System.out.println("end" ); } }
图案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 输入: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma 需求出输入中的所有的孙子与祖父母 Mapper<LongWritable, Text, Text, Text> public void map(LongWritable k1, Text v1, Context context) 输入LongWritable k1, Text v1(LongWritable, Text):序号,行 处理:读取行里的数据split,并以关系形式保存(以Tom Lucy为例): Tom,1,Tom,Lucy Tom,2,Lucy,Tom 输出Context context(Text, Text):人名,这个人与其他人的关系(数组) Reducer<Text, Text, Text, Text> public void reduce(Text key, Iterable<Text> values, Context context) 输入Text key, Iterable<Text> values(Text, Text):人名,这个人与其他人的关系(数组) 处理:从数组中读出关系并将与该人有关的符合条件的人加入临时数组并输出 输出Context context(Text, Text):孙子,祖父母 Jone Alice Jone Jesse Tom Alice Tom Jesse Jone Mary Jone Ben Tom Mary Tom Ben Mark Alice Mark Jesse Philip Alice Philip Jesse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 package mr;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem ;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyGL { private static class MyGLMapper extends Mapper <LongWritable, Text, Text, Text>{ public void map (LongWritable k1, Text v1, Context context) throws java.io.IOException, java.lang.InterruptedException{ String[] lines = v1.toString().split("\t" ); if (lines.length != 2 || lines[0 ].trim().equals("child" )) return ; String word1=lines[0 ].trim(); String word2=lines[1 ].trim(); context.write(new Text (word1), new Text ("1" +"," +word1+"," +word2)); context.write(new Text (word2), new Text ("2" +"," +word1+"," +word2)); System.out.println("map......" +word1+"-" +word2); } } private static class MyGLReduce extends Reducer <Text, Text, Text, Text>{ public void reduce (Text key, Iterable<Text> values, Context context) throws java.io.IOException, java.lang.InterruptedException { List<String> grandch = new ArrayList (); List<String> grandpa = new ArrayList (); Iterator<Text> it=values.iterator(); while (it.hasNext()){ String lines= it.next().toString(); String[] words=lines.split("," ); if (words[0 ].equals("1" )){ grandpa.add(words[2 ]); } else if (words[0 ].equals("2" )){ grandch.add(words[1 ]); } else return ; } for (String ch:grandch) for (String pa:grandpa){ context.write(new Text (ch), new Text (pa)); System.out.println("reduce......" +ch+" - " +pa); } System.out.println("reduce......" ); } protected void cleanup (Context context) throws java.io.IOException, java.lang.InterruptedException{ } } private static String INPUT_PATH="hdfs://master:9000/input/gl.dat" ; private static String OUTPUT_PATH="hdfs://master:9000/output/c/" ; public static void main (String[] args) throws Exception { Configuration conf=new Configuration (); FileSystem fs=FileSystem.get(new URI (OUTPUT_PATH),conf); if (fs.exists(new Path (OUTPUT_PATH))) fs.delete(new Path (OUTPUT_PATH)); Job job=new Job (conf,"myjob" ); job.setJarByClass(MyGL.class); job.setMapperClass(MyGLMapper.class); job.setReducerClass(MyGLReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path (INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path (OUTPUT_PATH)); job.waitForCompletion(true ); } }
矩阵乘法 矩阵乘法公式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 矩阵A(4*3)(i*n) 1,2,3 4,5,0 7,8,9 10,11,12 矩阵B(3*2)(n*j) 10,15 0,2 11,9 根据矩阵乘法的定义:矩阵A的列数=矩阵B的行数,即矩阵A和矩阵B都有相同的n 矩阵乘法的结果是产生(i*j)的矩阵C 矩阵C(4*2)(i*j) 43,46 40,70 169,202 232,280 1*10+2*0+3*11=43 1*15+2*2+3*9=46 计算每个矩阵C中的元素(i,j)都需要矩阵A的(i,r)与矩阵B的(r,j)相乘再加上下一个r取值[1,n] 接下来看看进行一个矩阵计算需要哪些信息: 因为每次计算r都是从1到n,所以r的值不需要保存进map, 需要:计算结果是在C的哪里即(i,j),A矩阵对应的值,B矩阵对应的值,这个值来自哪个矩阵(A还是B) 那么如何唯一标识矩阵C的一个元素呢?使用矩阵C的坐标,将C的坐标(i,j)作为key (哪个矩阵,对应的r,矩阵的值)作为value,这样就可以保存进行矩阵计算的全部信息了 分类讨论: (i,j为计算C的第(i,j)个元素的值,r取值[1,n]) 对于矩阵A的值: key(i,j) value(a,A的列即r,A[i,r]) 对于矩阵B的值: key(i,j) value(b,B的列即r,B[r,j]) 分类讨论的计算过程见下图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 package demo;import java.io.IOException;import java.net.URI;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MatrixProdect { static class MyMapper extends Mapper <LongWritable, Text, Text, Text> { private int rowNum = 4 ; private int colNum = 2 ; private int rowIndexA = 1 ; private int rowIndexB = 1 ; public void map (LongWritable key, Text value, Context context) throws java.io.IOException, java.lang.InterruptedException { FileSplit fs = (FileSplit) context.getInputSplit(); String fileName = fs.getPath().getName(); String[] tokens = value.toString().split("," ); if ("a" .equals(fileName)) { for (int j = 1 ; j <= colNum; j++) { Text k = new Text (rowIndexA + "," + j); for (int r = 0 ; r < tokens.length; r++) { Text v = new Text ("a," + (r + 1 ) + "," + tokens[r]); System.out.println("map......" + fileName + "(" + k + ")" + v); context.write(k, v); } } rowIndexA++; } else if ("b" .equals(fileName)) { for (int i = 1 ; i <= rowNum; i++) { for (int r = 0 ; r < tokens.length; r++) { Text k = new Text (i + "," + (r + 1 )); Text v = new Text ("b," + rowIndexB + "," + tokens[r]); System.out.println("map......" + fileName + "(" + k + ")" + v); context.write(k, v); } } rowIndexB++; } } } static class MyReduce extends Reducer <Text, Text, Text, IntWritable> { public void reduce (Text key, Iterable<Text> values, Context context) throws java.io.IOException, java.lang.InterruptedException { Map<String, String> mapA = new HashMap <String, String>(); Map<String, String> mapB = new HashMap <String, String>(); for (Text value : values) { String[] val = value.toString().split("," ); if ("a" .equals(val[0 ])) { mapA.put(val[1 ], val[2 ]); } else if ("b" .equals(val[0 ])) { mapB.put(val[1 ], val[2 ]); } } int result = 0 ; Iterator<String> mKeys = mapA.keySet().iterator(); while (mKeys.hasNext()) { String mkey = mKeys.next(); if (mapB.get(mkey) == null ) { continue ; } result += Integer.parseInt(mapA.get(mkey)) * Integer.parseInt(mapB.get(mkey)); } System.out.println("reduce......" + "(" + key + ")" + result); context.write(key, new IntWritable (result)); } } private static String INPUT_PATH_A = "hdfs://master:9000/input/a" ; private static String INPUT_PATH_B = "hdfs://master:9000/input/b" ; private static String OUTPUT_PATH = "hdfs://master:9000/output/matrix/" ; public static void main (String[] args) throws Exception { Configuration conf = new Configuration (); FileSystem fs = FileSystem.get(new URI (OUTPUT_PATH), conf); if (fs.exists(new Path (OUTPUT_PATH))) fs.delete(new Path (OUTPUT_PATH)); Job job = new Job (conf, "myjob" ); job.setJarByClass(MatrixProdect.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path (INPUT_PATH_A)); FileInputFormat.addInputPath(job, new Path (INPUT_PATH_B)); FileOutputFormat.setOutputPath(job, new Path (OUTPUT_PATH)); job.waitForCompletion(true ); System.out.println("end" ); } }
计算方法与上面一样,只是矩阵的存储结构不一样。省略了值为0的元素,对于较大且稀疏的矩阵所占存储空间较小