博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce-Hadoop分布式计算模型
阅读量:6273 次
发布时间:2019-06-22

本文共 3389 字,大约阅读时间需要 11 分钟。

MapReduce概述

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。

MapReduce实现原理

ddd

执行步骤:
1.map任务处理

1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

序列化

在MapReduce中,序列化是一个很重要的步骤。

序列化就是把结构化的对象转化为字节流。

反序列化就是把字节流转回结构化对象。

hadoop中的Partitioner分区

Hadoop中的MapReduce支持对key进行分区,从而可以使map出来的数据均匀分布在reduce上。

框架自带了一个默认的分区类,HashPartitioner,先看看这个类,就知道怎么自定义key分区了,

1 2 3 4 5 6 7 8 9
public class HashPartitioner
extends Partitioner
{ /** Use { @link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

 

先解释一下这个HashPartitioner做的事情,

1
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

 

将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。

实现分区的步骤:

  1. 先分析一下具体的业务逻辑,确定大概有多少个分区
  2. 首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类
  3. 重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
  4. 在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
  5. 设置Reducer的数量,job.setNumReduceTasks(6);

举例,

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public static class ProviderPartitioner extends Partitioner
{ @Override public int getPartition(Text key, DataBean value, int arg2) { String account = key.toString(); String sub_acc = account.substring(0,3); Integer code = 0; if(sub_acc.equals("aaa")){ code = 1; } return code; } }

 

MapReduce中的Combiners编程

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

举例:

1 2 3 4 5 6 7 8 9 10 11 12 13 14
public static class Combine extends Reducer
{ // Reduce Method public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { double sum = 0; int count = 0; for (Text value : values) { String fields[] = value.toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } context.write(key, new Text(sum+","+count)); } }

 

在main方法中设置Combiner的类,

1
job.setCombinerClass(Combine.class);

 

Shuffle-MapReduce的核心

首先让我们看一下下面这张图,

ddd
Mapper处理过程:

  1. 一个输入切片对应一个Mapper, 也就是一个Mapper任务读取文件的一部分;
  2. 每一个Mapper都会对应一个环形缓冲区,用来存储Mapper的输出,默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件;
  3. 在写入磁盘之前要对数据进行分区、排序;
  4. 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

Reducer处理过程:

  1. Reducer通过Http方式得到输出文件的分区,每个Reducer会取相对应分区的数据;
  2. Reducer取到数据之后,首先会进行排序,之后合并过的数据会再一此进行排序;
  3. 排序阶段合并map输出,然后走Reduce阶段。

总结

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

 

转载自: 

转载于:https://www.cnblogs.com/ccknot/articles/4654762.html

你可能感兴趣的文章
tableVIew删除时的delete按钮被挡住时重写的方法
查看>>
读cookie中文字符乱码问题
查看>>
招募译者翻译并发数据结构
查看>>
普通表转换为分区表
查看>>
Java 容器 & 泛型:三、HashSet,TreeSet 和 LinkedHashSet比较
查看>>
性能优化总结(六):预加载、聚合SQL应用实例
查看>>
http缓存知识
查看>>
Go 时间交并集小工具
查看>>
iOS 多线程总结
查看>>
webpack是如何实现前端模块化的
查看>>
TCP的三次握手四次挥手
查看>>
关于redis的几件小事(六)redis的持久化
查看>>
package.json
查看>>
webpack4+babel7+eslint+editorconfig+react-hot-loader 搭建react开发环境
查看>>
Maven 插件
查看>>
初探Angular6.x---进入用户编辑模块
查看>>
计算机基础知识复习
查看>>
【前端词典】实现 Canvas 下雪背景引发的性能思考
查看>>
大佬是怎么思考设计MySQL优化方案的?
查看>>
<三体> 给岁月以文明, 给时光以生命
查看>>