MapReduce的自定义排序、分区和分组

Stella981
• 阅读 823

自定义排序(WritableComparable)

当写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输

而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。

操作:

自定义bean来封装处理后的信息,可以自定义排序规则用bean中的某几个属性来作为排序的依据

代码节段:

自定义的bean实现WritableComparable接口,并对compareTo的实现

//先按照年龄排序,再按性别(年龄小,sex大的在前)

@Override
public int compareTo(Person o) {
if(o.age==this.age){
if(o.sex==this.sex){
return 0;
}else{
return o.sex-this.sex;
}
}else{
return this.age-o.age;
}
}

注意: 1.hadoop开发了一套自己的序列化和反序列化策略(Writable),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化。

    2.如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。

自定义分区(Partitioner)

Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。

操作:

在job对象中,设job.setPartitionerClass(自定义分区类.class)

节选代码:

public class CustomPartitioner extends Partitioner<Text,Text>{

/*
* numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。
* 假如job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer hash = numMap.get(key.toString().substring(0, 1));
//将没有匹配到的数据放入3号分区
return hash==null?3:hash;
}
}

自定义分组(GroupingComparator)

假设把bean作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。

1.自定义分组要继承WritableComparator,然后重写compare方法。

_2.定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);
_代码节选

public class CustomGroupingComparator extends WritableComparator{
protected CustomGroupingComparator() {
super(Person.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean abean = (Bean) a;
Bean bbean = (Bean) b;
//将item_id相同的bean都视为相同,从而聚合为一组
return abean.getAge()-bbean.getAge();
}
}

自定义分组排序(SortGroupingComparator)尽快补充上

自定义分组排序(SortComparator)尽快补充上

每个分区内都会调用job.setSortComparatorClass()设置的key比较函数类排序;

如果没有通过job.setSortComparatorClass()设置key比较函数类,则使用key的实现的compareTo方法

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
MapReduce之Shuffle,自定义对象,排序已经Combiner
1\.Shuffle:MapReduce的计算模型主要分为三个阶段,Map,shuffle,Reduce。Map负责数据的过滤,将文件中的数据转化为键值对,Reduce负责合并将具有相同的键的值进行处理合并然后输出到HDFS。为了让Reduce可以并行处理map的结果,必须对Map的输出进行一定的排序和分割,然后交个Red
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
JS 对象数组Array 根据对象object key的值排序sort,很风骚哦
有个js对象数组varary\{id:1,name:"b"},{id:2,name:"b"}\需求是根据name或者id的值来排序,这里有个风骚的函数函数定义:function keysrt(key,desc) {  return function(a,b){    return desc ? ~~(ak
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(