Hadoop

19、Java技术扩展【Hadoop8天】

第一天    hadoop的基本概念  伪分布式hadoop集群安装  hdfs mapreduce 演示

第二天  hdfs的原理和使用操作、编程

第三天  mapreduce的原理和编程

第四天  常见mr算法实现和shuffle的机制

第五天  hadoop2.x中HA机制的原理和全分布式集群安装部署及维护

第六天  hbase  hive

第七天  storm+kafka

第八天  实战项目

hadoop是什么?  第一天

解决问题:

•海量数据的存储(HDFS

•海量数据的分析(MapReduce

•资源管理调度(YARN

下载安装CENTOS 7

aria2c.exe -x5 -j5 http://mirrors.aliyun.com/centos/7/isos/x86_64/CentOS-7-x86_64-DVD-1611.iso

-x 同一服务器最大同时请求数 -j线程数

发现增加-x对速度提高大

不下了,教程里有

参考 hadoop2.4.1伪分布式搭建.txt

解压 CentOS.zip,重命名为 CentOS_Hadoop

启动 用户名hadoop 密码hadoop

设置静态ip     192.168.50.130 255.255.255.0 192.168.50.2 8.8.8.8

在/etc/sudoers里加入 hadoop用户

sudo service network restart

/etc/inittab 改为 id:3:initdefault:     默认启动命令行多用户模式

装JAVA

展【Hadoop8天】/weekend110-第1天/笔记文档软件$ scp.exe jdk-7u65-linux-i586.tar.gz hadoop@192.168.50.130:~/

tar xzf jdk-7u65-linux-i586.tar.gz

将java添加到环境变量中

        vim /etc/profile

        #在文件最后添加

        export JAVA_HOME=/home/hadoop/app/jdk1.7.0_65

        export PATH=$PATH:$JAVA_HOME/bin

        #刷新配置

        source /etc/profile

安装tmux

参考 https://www.zybuluo.com/mwumli/note/149542

wget https://cloud.github.com/downloads/libevent/libevent/libevent-2.0.21-stable.tar.gz

...

sudo yum install ncurses-devel

展【Hadoop8天】/weekend110-第1天/笔记文档软件$ scp tmux-2.4.tar.gz hadoop@192.168.50.130:~/

配置hadoop

hadoop2.4.1伪分布式搭建.txt

hadoop-env.sh

     配置JAVA_HOME环境变量     (虽然env里有,但是不配置还是会出错)

core-site.xml

     指定HADOOP所使用的文件系统schema(URI),HDFS的老大

     指定hadoop运行时产生文件的存储目录

hdfs-site.xml

     指定HDFS副本的数量

mapred-site.xml

     指定mr运行在yarn上

yarn-site.xml

     指定YARN的老大(ResourceManager)的地址

     reducer获取数据的方式

关闭防火墙

sudo service iptables stop

关闭当前登录模式防火墙的状态

sudo chkconfig iptables off

查看不同登录模式防火墙的状态

sudo chkconfig iptables --list

将hadoop添加到环境变量

/home/hadoop/app/hadoop-2.4.1

格式化namenode

     hdfs namenode -format (hadoop namenode -format)

启动hadoop

        先启动HDFS

        sbin/start-dfs.sh

        再启动YARN

        sbin/start-yarn.sh

验证是否启动成功

        使用jps命令验证

        27408 NameNode

        28218 Jps

        27643 SecondaryNameNode

        28066 NodeManager

        27803 ResourceManager

        27512 DataNode

        http://192.168.50.130:50070 (HDFS管理界面)

        http://192.168.50.130:8088 (MR管理界面)

上传文件到hdfs

hadoop fs -put tmux-2.4.tar.gz hdfs://hadoop1:9000/

hadoop fs -put tmux-2.4.tar.gz hdfs://localhost:9000/

出错

core-site.xml里的 fs.defaultFS

yarn-site.xml里的yarn.resourcemanager.hostname

必须写绝对地址,写localhost之后异地访问会吃出错

还要配置node名称

etc/hadoop/slaves 这是配置datanodes

还是不行

是/etc/host 的问题

192.168.50.130 hadoop1

不能写127.0.0.1

ssh免密码登录

cd ~/.ssh

ssh-keygen -t rsa (四个回车)

cat id_rsa.pub >> authorized_keys

sudo vim /etc/ssh/sshd_config

sudo service sshd restart

.ssh/authorized_keys的权限必须要设置为644

id_rsa的权限设置为600

mapreduce程序例子

位置 /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce

计算圆周率

 hadoop jar hadoop-mapreduce-examples-2.4.1.jar pi 5 5

统计单词出现次数

vim test.txt

hadoop fs -mkdir /wordcount

hadoop fs -mkdir /wordcount/input

hadoop fs -put test.txt /wordcount/input/

运行

hadoop jar hadoop-mapreduce-examples-2.4.1.jar wordcount /wordcount/input /wordcount/output/

查看结果

hadoop fs -ls /wordcount/output

hadoop fs -cat /wordcount/output/part-r-00000

HDFS的架构

主从结构

•主节点, namenode

•从节点,有很多个: datanode

namenode负责:

•接收用户操作请求

•维护文件系统的目录结构

•管理文件与block之间关系,block与datanode之间关系

datanode负责:

•存储文件

•文件被分成block存储在磁盘上

•为保证数据安全,文件会有多个副本

HDFS的实现思想

  1. hdfs通过分布式集群来存储文件
  2. 文件存储到hdfs集群中取的时候被分成block
  3. 文件的block存放在若干台datanode节点上
  4. hdfs文件系统中的文件与真实block之间有映射关系,由namenode管理
  5. 每一个block在集群中会存储多个副本,好处是可以提高数据的可靠性,还可以提高访问的吞吐量

hdfs的shell操作

[hadoop@hadoop1 mapreduce]$ hadoop fs

Usage: hadoop fs [generic options]

  [-appendToFile <localsrc> ... <dst>]

  [-cat [-ignoreCrc] <src> ...]

  [-checksum <src> ...]

  [-chgrp [-R] GROUP PATH...]

  [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]

  [-chown [-R] [OWNER][:[GROUP]] PATH...]

  [-copyFromLocal [-f] [-p] <localsrc> ... <dst>]

  [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]

  [-count [-q] <path> ...]

  [-cp [-f] [-p] <src> ... <dst>]

  [-createSnapshot <snapshotDir> [<snapshotName>]]

  [-deleteSnapshot <snapshotDir> <snapshotName>]

  [-df [-h] [<path> ...]]

  [-du [-s] [-h] <path> ...]

  [-expunge]

  [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]

  [-getfacl [-R] <path>]

  [-getmerge [-nl] <src> <localdst>]

  [-help [cmd ...]]

  [-ls [-d] [-h] [-R] [<path> ...]]

  [-mkdir [-p] <path> ...]

  [-moveFromLocal <localsrc> ... <dst>]

  [-moveToLocal <src> <localdst>]

  [-mv <src> ... <dst>]

  [-put [-f] [-p] <localsrc> ... <dst>]

  [-renameSnapshot <snapshotDir> <oldName> <newName>]

  [-rm [-f] [-r|-R] [-skipTrash] <src> ...]

  [-rmdir [--ignore-fail-on-non-empty] <dir> ...]

  [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]

  [-setrep [-R] [-w] <rep> <path> ...]

  [-stat [format] <path> ...]

  [-tail [-f] <file>]

  [-test -[defsz] <path>]

  [-text [-ignoreCrc] <src> ...]

  [-touchz <path> ...]          创建空文件,touch

  [-usage [cmd ...]]

hdfs不支持修改文件内容

hdfs中文件夹是元数据,不是文件

第二天

存很多小文件缺点     占用元数据空间     占用存储空间

  • NameNode元数据管理机制

NN     editslog     fsimage     client

  1. 客户端上传文件时,NN首先往edits log文件中记录元数据操作日志
  2. 客户端开始上传文件,完成后返回成功信息给NN,NN就在内存中写入这次上传操作的新产生的元数据信息
  3. 每当editslog写满时,需要将这一段时间的新的元数据刷到fsimage里去

+--NN-----------------------+

|     +--内存------+                 |

|     | meta.data    |                  |

|    +-------------+                 |

|    +---------+ +---------+   |

|     | fsimage  |  |edits.new |    |

|     |               | +---------+    |

|     |               |   +---------+  |

|    +---------+   |edits        |   |

|                         +---------+  |

+-----------------------------+

将editslog和fsimage合并,由second namenode做的

  • secondary namenode的工作流程

1.secondary通知namenode切换edits文件

2.secondary从namenode获得fsimage和edits(通过http)

3.secondary将fsimage载入内存,然后开始合并edits

4.secondary将新的fsimage发回给namenode

5.namenode用新的fsimage替换旧的fsimage

什么时候checkpiont

  • fs.checkpoint.period 指定两次checkpoint的最大时间间隔,默认3600秒。
  • fs.checkpoint.size 规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否到达最大时间间隔。默认大小是64M。
  • Datanode的工作原理

提供真实文件数据的存储服务。

文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.

dfs.block.size

不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间

Replication。多复本。默认是三个。

hdfs-site.xml的dfs.replication属性

  • Remote Procedure Call

lRPC——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。

hadoop的整个体系结构就是构建在RPC之上的(见org.apache.hadoop.ipc)。

  • HDFS的JAVA客户端编写

先启动hdfs

./start-dfs.sh && ./start-yarn.sh

[hadoop@hadoop1 hadoop-2.4.1]$ hadoop fs -ls /

Found 5 items

-rw-r--r-- 1 hadoop supergroup 143588167 2017-05-24 07:59 /jdk-7u65-linux-i586.tar.gz

drwx------ - hadoop supergroup 0 2017-05-23 23:31 /tmp

-rw-r--r-- 1 hadoop supergroup 470549 2017-05-23 22:36 /tmux-2.4.tar.gz

drwxr-xr-x - hadoop supergroup 0 2017-05-23 23:31 /user

drwxr-xr-x - hadoop supergroup 0 2017-05-23 23:39 /wordcount

启动成功

打开eclipse创建项目hadoop1

创建依赖库集合hdfslib

位置 /home/hadoop/app/hadoop-2.4.1/share/hadoop/hdfs

包含hdfs依赖

share/hadoop/hdfs/hadoop-hdfs-2.4.1.jar

以及hdfs需要的依赖

share/hadoop/hdfs/lib/*

以及hadoop common依赖

share/hadoop/common/hadoop-common-2.4.1.jar

以及hadoop common需要的依赖

share/hadoop/common/lib/*

编写代码

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

Path src = new Path("hdfs://hadoop1:9000/jdk-7u65-linux-i586.tar.gz");

FSDataInputStream in = fs.open(src);

FileOutputStream os = new FileOutputStream("./jdk-7u65-linux-i586.tar.gz");

IOUtils.copy(in, os);

报错

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://hadoop1:9000/jdk-7u65-linux-i586.tar.gz, expected: file:///

因为没有配置正确

添加 core-site.xml 不知道为什么不行,只好在代码里添加

conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

下载成功

上传文件用

FSDataOutputStream os = fs.create(dst);

......

IOUtils.copy(is, os);

或者

fs.copyToLocalFile(src, dst);

下载文件用

FSDataInputStream in = fs.open(src);

......

IOUtils.copy(in, os);

或者

fs.copyToLocalFile(src, dst);

设置操作用户名

VM arguments:-DHADOOP_USER_NAME=hadoop

或者

fs = FileSystem.get(new URI("hdfs://weekend110:9000/", conf, "hadoop"));

文件夹操作

fs.mkdirs(new Path("/abc"));

fs.delete(new Path("/abc") );

fs.listFiles(new Path("/abc", true));     //是否递归

  • Hadoop中的RPC应用实例

服务代理类 LoginServiceInterface.java

服务实现类 LoginServiceImpl.java

服务端start.java

Builder builder = new RPC.Builder(new Configuration());   builder.setBindAddress("localhost").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());

Server server = builder.build();

server.start();

客户端LoginContorller.java

LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("localhost", 10000), new Configuration());

String result = proxy.login("angelababy", "123456");

System.out.println(result);

  • hdfs下载数据源码分析

FileSystem fs = FileSystem.get(conf);调用过程

private static FileSystem createFileSystem(URI uri, Configuration conf

  ) throws IOException {

  Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);

  if (clazz == null) {

  throw new IOException("No FileSystem for scheme: " + uri.getScheme());

  }

  FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);

  fs.initialize(uri, conf);

  return fs;

  }

4.HDFS源码分析

例子中FileSystem是DistributedFileSystem

this.dfs = new DFSClient(uri, conf, statistics);

DFSClient里初始化了namenode的代理对象

FileSystem.get --> 通过反射实例化了一个DistributedFileSystem --> new DFSCilent()把他作为自己的成员变量

在DFSClient构造方法里面,调用了createNamenode,使用了RPC机制,得到了一个NameNode的代理对象,就可以和NameNode进行通信了

FileSystem --> DistributedFileSystem --> DFSClient --> NameNode的代理

第三天

  • hdfs打开输入流源码分析

FileSystem fs = FileSystem.get(conf); //DistributedSystem

DistributedFileSystem{

     DFSClient dfs;

}

DFSInputStream is = fs.open(path);

DistributedFileSystem.dfs.open(path){

     return new DFSInputStream(this, src, buffersize, verifyVhecksum);

     openInfo();

}

//获取文件块信息

openInfo(){

//这里完成对locatedBlocks赋值

lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();

}

在new DFSInputStream里面

private dfsClient = this;

private src = src;

buffersize = buffersize;

//以下成员在openInfo()中赋值

private locatedBlocks;

private blockReader;

  • mapreduce介绍和wordcount

WCMapper.java

//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型

//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的

//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

输入类型是框架传给我们的,我们不能改变。

分别是Long和String。Long是起始偏移量。

但是使用自带的String和Long,序列化后的信息太多,不适合网络传输。

Hadoop实现了自己的一套序列化机制,LongWritable和Text

然后重写map方法

context.write(new Text(word), new LongWritable(1));

WCReducer.java

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法

    //<hello,{1,1,1,1,1,1.....}>

protected void reduce(Text key, Iterable<LongWritable> values,Context context)

重写reduce方法

context.write(key, new LongWritable(count));

WCRunner.java

org.apache.hadoop.mapred 是老版本的

org.apache.hadoop.mapreduce     是新版本的,我们使用这个包里的类

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

  Job wcjob = Job.getInstance(conf);

  //设置整个job所用的那些类在哪个jar包

  wcjob.setJarByClass(WCRunner.class);

  //本job使用的mapper和reducer的类

  wcjob.setMapperClass(WCMapper.class);

  wcjob.setReducerClass(WCReducer.class);

  //指定reduce的输出数据kv类型

  wcjob.setOutputKeyClass(Text.class);

  wcjob.setOutputValueClass(LongWritable.class);

  //指定mapper的输出数据kv类型

  wcjob.setMapOutputKeyClass(Text.class);

  wcjob.setMapOutputValueClass(LongWritable.class);

  //指定要处理的输入数据存放路径 new Path("/wc/srcdata")

  FileInputFormat.setInputPaths(wcjob, new Path("hdfs://hadoop1:9000/wc/srcdata/"));

  //指定处理结果的输出数据存放路径 new Path("wc/output3")

  FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hadoop1:9000/wc/output3/"));

  //将job提交给集群运行

  wcjob.waitForCompletion(true);

}

设置setJarByClass找到jar包在哪。

原理:通过设置的类的classLoader找到其所在的jar包

导出jar包到集群里运行

scp wc.jar hadoop@192.168.50.130:~/

需要启动 hdfs 和 yarn

start-dfs.sh && start-yarn.sh

[hadoop@hadoop1 app]$ hadoop fs -mkdir /wc/srcdata

[hadoop@hadoop1 app]$ hadoop fs -mkdir /wc/output3

mv ../wc.jar ./

[hadoop@hadoop1 app]$ hadoop fs -put words.log /wc/srcdata

[hadoop@hadoop1 app]$ hadoop jar wc.jar com.example.hadoop.mr.wordcount.WCRunner

报错了

Exception in thread "main" java.lang.UnsupportedClassVersionError: com/example/hadoop/mr/wordcount/WCRunner : Unsupported major.minor version 52.0

是因为虚拟机集群里的java版本1.7比编译时的1.8低。

在eclipse右键调整项目编译兼容性为1.7既可以。

[hadoop@hadoop1 app]$ hadoop fs -ls /wc/output3

Found 2 items

-rw-r--r-- 1 hadoop supergroup 0 2017-06-17 01:49 /wc/output3/_SUCCESS

-rw-r--r-- 1 hadoop supergroup 48 2017-06-17 01:49 /wc/output3/part-r-00000

[hadoop@hadoop1 app]$ hadoop fs -cat /wc/output3/part-r-00000

abc 1

bye 1

good 1

hello 4

jack 1

tom 1

world 1

可以看到默认按照key的字符顺序列出来了

  • mr本地运行模式

在windows中运行本地模式需要winutils.exe等工具和设置

并且把输入输出修改为本地路径 new Path("d:/wc/srcdata/"

对于 new Path("hdfs://hadoop1:9000/wc/output3/") 提交到集群注意权限。

  • job提交的逻辑

  • mr程序的几种提交运行模式

本地模型运行

1/在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行

      ----输入输出数据可以放在本地路径下(c:/wc/srcdata/)

      ----输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)

2/在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行

      ----输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata/)

      ----输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata) 

集群模式运行

1/将工程打成jar包,上传到服务器,然后用hadoop命令提交  hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner

2/在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取以下措施:

      ----在工程src目录下加入 mapred-site.xml  和  yarn-site.xml

      ----将工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set("mapreduce.job.jar","wc.jar");           

3/在windows的eclipse中直接运行main方法,也可以提交给集群中运行,但是因为平台不兼容,需要做很多的设置修改

        ----要在windows中存放一份hadoop的安装包(解压好的)

        ----要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件

        ----再要配置系统环境变量 HADOOP_HOME  和 PATH

        ----修改YarnRunner这个类的源码

  • YARN的通用性意义

yarn里面不仅可以跑mapreduce,还可以运行其他的计算。

mapreduce速度特别慢,不适合实时性强的场景。因为它的数据都是放在磁盘上。

strom和spark现在很火,这两个框架适合实时计算。

而我不管这些框架怎么计算,只要符合我的yarn框架要求,都可以放在yarn里跑。

要符合yarn的要求,需要一个主管进程来管理,即app master。

你的map master要符合我yarn的要求,有一个继承我的appmaster的类。

所以yarn的出现大大扩展了hadoop的场景和生命周期。

所以hadoop有了hdfs和yarn,应用非常广。

  • yarn的job提交流程

客户端通过

wcjob.waitForCompletion(true);

将任务提交到集群。

waitForCompletion() -> submit()

{

connect()

......

submitter.submitJobInternal(Job.this, cluster)

}

connect() // 里给 job 成员 cluster 赋值

{

cluster = new Cluster(getConfiguration());

}

cluster里的initalize() 里创建了一个RPC客户端

通过配置文件找到合适的provider创建rpc客户端

submitJobInternal() 里

{

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

......

JobID jobId = submitClient.getNewJobID();

 job.setJobID(jobId);

Path submitJobDir = new Path(jobStagingArea, jobId.toString());

......

copyAndConfigureFiles(job, submitJobDir);      //提交了jar包

 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

......

  int maps = writeSplits(job, submitJobDir);          //获取切片信息

......

writeConf(conf, submitJobFile);          //写描述信息文件xml

}

任务提交的临时文件在/tmp/hadoop-yarn/staging/hadoop/.staging/

[hadoop@hadoop1 app]$ hadoop fs -ls /tmp/hadoop-yarn/staging

Found 2 items

drwx------ - hadoop supergroup 0 2017-05-23 23:31 /tmp/hadoop-yarn/staging/hadoop

drwxr-xr-x - hadoop supergroup 0 2017-05-23 23:31 /tmp/hadoop-yarn/staging/history

[hadoop@hadoop1 app]$ hadoop fs -ls /tmp/hadoop-yarn/staging/hadoop/

Found 1 items

drwx------ - hadoop supergroup 0 2017-06-17 01:49 /tmp/hadoop-yarn/staging/hadoop/.staging

[hadoop@hadoop1 app]$ hadoop fs -ls /tmp/hadoop-yarn/staging/hadoop/.staging/

---------------------------------------------------------------------------

第四天

1 hadoop中的序列化机制

要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制,实现hadoop想要的序列化接口。

实现Writable接口

方法 write readFields

hadoop的序列化机制不会传递对象的继承结构信息

与JDK自带的序列化机制不同

因为hadoop只需要传输数据,不需要复杂的继承结构,节省网络带宽。

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>

(Mapper的输出和Reducer的输入相同)

框架每传递一组数据<1387788654,{flowbean, flowbean, flowbean, flowbean......}>调用一次我们的reduce方法

reduce中的而业务逻辑就是遍历values,然后进行累加求和再输出

2 hadoop中的自定义排序实现

排序MR默认是按key2进行排序的,如果想自定义排序规则,被排序的对象要实现WritableComparable接口,在compareTo方法中实现排序规则,然后将这个对象当做k2,即可完成排序

3 mr程序中自定义分组的实现

1.实现分区的步骤:

    1.1先分析一下具体的业务逻辑,确定大概有多少个分区

    1.2首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类

    1.3重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字

    1.4在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);

    1.5设置Reducer的数量,job.setNumReduceTasks(6);

2. HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

发表评论

邮箱地址不会被公开。 必填项已用*标注