MongoDB MapReduce

Stella981
• 阅读 650

在开发前端数据统计时,使用MongoDB 作为数据库,收集相关数据,在后期展示时,使用到Mongodb的Mapreduce 做数据会中处理,

现在将相关问题记录下来,方便以后查找,也方便相关同学

MongoDB Mapreduce 使用说明

背景

MapReduce是个非常灵活和强大的数据聚合工具。它的好处是可以把一个聚合任务分解为多个小的任务,分配到多服务器上并行处理。
MongoDB也提供了MapReduce,当然查询语肯定是JavaScript。MongoDB中的MapReduce主要有以下几阶段:

  1. Map:把一个操作Map到集合中的每一个文档
  2. Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。
  3. Reduce: 处理值表中的元素,直到值表中只有一个元素。然后将值表返回到Shuffle过程,循环处理,直到每个Key只对应一个值表,并且此值表中只有一个元素,这就是MR的结果。
  4. Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。
    MongoDB中使用emit函数向MapReduce提供Key/Value对。
    Reduce函数接受两个参数:Key,emits. Key即为emit函数中的Key。 emits是一个数组,它的元素就是emit函数提供的Value。
    Reduce函数的返回结果必须要能被Map或者Reduce重复使用,所以返回结果必须与emits中元素结构一致。
    Map或者Reduce函数中的this关键字,代表当前被Mapping文档。

语法

db.runCommand(
   { mapreduce : 字符串,集合名,
     map : 函数,见下文
     reduce : 函数,见下文
     [, query : 文档,发往map函数前先给过渡文档]
     [, sort : 文档,发往map函数前先给文档排序]
     [, limit : 整数,发往map函数的文档数量上限]
     [, out : 字符串,统计结果保存的集合]
     [, keeptemp: 布尔值,链接关闭时临时结果集合是否保存]
     [, finalize : 函数,将reduce的结果送给这个函数,做最后的处理]
     [, scope : 文档,js代码中要用到的变量]
     [, jsMode : 布尔值,是否减少执行过程中BSON和JS的转换,默认true] 
      //注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,<br> 
      //true 时BSON-->js-->map-->reduce-->BSON
     [, verbose : 布尔值,是否产生更加详细的服务器日志,默认true]
   }
  );

实例(以商品举例)

测试数据: 这个集合是三个用户购买的产品和产品价格的数据。
// 初始化测试数据

for(var i=0;i<1000;i++){ 
   var rID=Math.floor(Math.random()*10); 
   var priceparseFloat((Math.random()*10).toFixed(2)); 
   if(rID<4){ 
     db.test.insert({"user":"Joe","sku":rID,"price":price}); 
    }else if(rID>=4 && rID<7)
   { 
     db.test.insert({"user":"Josh","sku":rID,"price":price}); 
   } else {  
     db.test.insert({"user":"Ken","sku":rID,"price":price}); 
   } 
}
  1. 每个用户各购买了多少个产品?(<单一Key做MR)

    //SQL实现
    select user,count(sku) from test  group by user
    
    //MapReduce实现
     map=function (){
     emit(this.user,{count:1})
     }
    
    reduce=function (key,values){
      var cnt=0;   
      values.forEach(function(val){ cnt+=val.count;});  
      return {"count":cnt};
    }
    
     //MR结果存到集合mr1
     db.test.mapReduce(map,reduce,{out:"mr1"})
     //查看MR之后结果
     > db.mr1.find()
     { "_id" : "Joe", "value" : { "count" : 416 } }
     { "_id" : "Josh", "value" : { "count" : 287 } }
     { "_id" : "Ken", "value" : { "count" : 297 } }
    
  2. 每个用户不同的产品购买了多少个?(复合Key做MR)

    //SQL实现
       select user,sku,count(*) from test group by user,sku
    
    //MapReduce实现
    map=function (){
     emit({user:this.user,sku:this.sku},{count:1})
    }
    
    reduce=function (key,values){
      var cnt=0;   
    values.forEach(function(val){ cnt+=val.count;});  
    return {"count":cnt};
    }
    
    db.test.mapReduce(map,reduce,{out:"mr2"})
    > db.mr2.find()
       { "_id" : { "user" : "Joe", "sku" : 0 }, "value" : { "count" : 103 } }
       { "_id" : { "user" : "Joe", "sku" : 1 }, "value" : { "count" : 106 } }
       { "_id" : { "user" : "Joe", "sku" : 2 }, "value" : { "count" : 102 } }
       { "_id" : { "user" : "Joe", "sku" : 3 }, "value" : { "count" : 105 } }
       { "_id" : { "user" : "Josh", "sku" : 4 }, "value" : { "count" : 87 } }
       { "_id" : { "user" : "Josh", "sku" : 5 }, "value" : { "count" : 107 }
    
  3. 每个用户购买的产品数量,总金额是多少?(复合Reduce结果处理)

    //SQL实现
       select user,count(sku),sum(price) from test group by user
    
     //MapReduce实现
       map=function (){
         emit(this.user,{amount:this.price,count:1})
       }
    
       reduce=function (key,values){
        var res={amount:0,count:0}
       values.forEach(function(val){ 
       res.amount+=val.amount;
       res.count+=val.count
       });  
       return res;
       }
    
       db.test.mapReduce(map,reduce,{out:"mr3"})
    
       > db.mr3.find()
       { "_id" : "Joe", "value" : { "amount" : 2053.8899999999994, "count" : 395 } }
       { "_id" : "Josh", "value" : { "amount" : 1409.2600000000002, "count" : 292 } }
       { "_id" : "Ken", "value" : { "amount" : 1547.7700000000002, "count" : 313 } }
    
  4. 在3中返回的amount的float精度需要改成两位小数,还需要得到商品的平均价格。(使用Finalize处理reduce结果集)

    //SQL实现
       select user,cast(sum(price) as   decimal(10,   2)) as amount,count(sku) as [count],
       cast((sum(price)/count(sku))  as decimal(10,2)) as avgPrice
       from test group by user
    
       //MapReduce实现
       map=function (){
         emit(this.user,{amount:this.price,count:1,avgPrice:0})
       }
    
       reduce=function (key,values){
        var res={amount:0,count:0,avgPrice:0}
       values.forEach(function(val){ 
       res.amount+=val.amount;
          res.count+=val.count
       });  
       return res;
       }
    
       finalizeFun=function (key,reduceResult){
        reduceResult.amount=(reduceResult.amount).toFixed(2);
        reduceResult.avgPrice=(reduceResult.amount/reduceResult.count).toFixed(2);
        return reduceResult;
       }
    
        db.test.mapReduce(map,reduce,{out:"mr4",finalize:finalizeFun})
       > db.mr4.find()
       { "_id" : "Joe", "value" : { "amount" : "2053.89", "count" : 395, "avgPrice" : "5.20" } }
       { "_id" : "Josh", "value" : { "amount" : "1409.26", "count" : 292, "avgPrice" : "4.83" } }
       { "_id" : "Ken", "value" : { "amount" : "1547.77", "count" : 313, "avgPrice" : "4.94" } }
    
  5. 统计单价大于6的SKU,每个用户的购买数量.(筛选数据子集做MR)
    这个比较简单了,只需要将1.中调用MR时加上筛选查询即可,其它不变.

    db.test.mapReduce(map,reduce,{query:{price:{"$gt":6}},out:"mr5"})
    

    总结

    MongoDB中的MR工具非常强大,文中的例子只是基础实例.结合Sharding后,多服务器并行做数据集合处理,才能真正显现其能力.

    1. 相关工具:
      目前mongodb 使用的工具最好用,且可以用的是 [NoSql Manager for MongoDB]
    2. 发现的问题:
      在Mapreduce 实践的过程中发现,如果某类记录,只有一条记录时,在mapreduce results 中,所展示的数据为Map的数据,并没有通过reduce汇总,只有当数据超过一条时,汇总数据才正确,需要具体开发的同学注意该问题
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
List的Select 和Select().tolist()
List<PersondelpnewList<Person{newPerson{Id1,Name"小明1",Age11,Sign0},newPerson{Id2,Name"小明2",Age12,
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这