在开发前端数据统计时,使用MongoDB 作为数据库,收集相关数据,在后期展示时,使用到Mongodb的Mapreduce 做数据会中处理,
现在将相关问题记录下来,方便以后查找,也方便相关同学
MongoDB Mapreduce 使用说明
背景
MapReduce是个非常灵活和强大的数据聚合工具。它的好处是可以把一个聚合任务分解为多个小的任务,分配到多服务器上并行处理。
MongoDB也提供了MapReduce,当然查询语肯定是JavaScript。MongoDB中的MapReduce主要有以下几阶段:
- Map:把一个操作Map到集合中的每一个文档
- Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。
- Reduce: 处理值表中的元素,直到值表中只有一个元素。然后将值表返回到Shuffle过程,循环处理,直到每个Key只对应一个值表,并且此值表中只有一个元素,这就是MR的结果。
- Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。
MongoDB中使用emit函数向MapReduce提供Key/Value对。
Reduce函数接受两个参数:Key,emits. Key即为emit函数中的Key。 emits是一个数组,它的元素就是emit函数提供的Value。
Reduce函数的返回结果必须要能被Map或者Reduce重复使用,所以返回结果必须与emits中元素结构一致。
Map或者Reduce函数中的this关键字,代表当前被Mapping文档。
语法
db.runCommand(
{ mapreduce : 字符串,集合名,
map : 函数,见下文
reduce : 函数,见下文
[, query : 文档,发往map函数前先给过渡文档]
[, sort : 文档,发往map函数前先给文档排序]
[, limit : 整数,发往map函数的文档数量上限]
[, out : 字符串,统计结果保存的集合]
[, keeptemp: 布尔值,链接关闭时临时结果集合是否保存]
[, finalize : 函数,将reduce的结果送给这个函数,做最后的处理]
[, scope : 文档,js代码中要用到的变量]
[, jsMode : 布尔值,是否减少执行过程中BSON和JS的转换,默认true]
//注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,<br>
//true 时BSON-->js-->map-->reduce-->BSON
[, verbose : 布尔值,是否产生更加详细的服务器日志,默认true]
}
);
实例(以商品举例)
测试数据: 这个集合是三个用户购买的产品和产品价格的数据。
// 初始化测试数据
for(var i=0;i<1000;i++){
var rID=Math.floor(Math.random()*10);
var priceparseFloat((Math.random()*10).toFixed(2));
if(rID<4){
db.test.insert({"user":"Joe","sku":rID,"price":price});
}else if(rID>=4 && rID<7)
{
db.test.insert({"user":"Josh","sku":rID,"price":price});
} else {
db.test.insert({"user":"Ken","sku":rID,"price":price});
}
}
每个用户各购买了多少个产品?(<单一Key做MR)
//SQL实现 select user,count(sku) from test group by user //MapReduce实现 map=function (){ emit(this.user,{count:1}) } reduce=function (key,values){ var cnt=0; values.forEach(function(val){ cnt+=val.count;}); return {"count":cnt}; } //MR结果存到集合mr1 db.test.mapReduce(map,reduce,{out:"mr1"}) //查看MR之后结果 > db.mr1.find() { "_id" : "Joe", "value" : { "count" : 416 } } { "_id" : "Josh", "value" : { "count" : 287 } } { "_id" : "Ken", "value" : { "count" : 297 } }
每个用户不同的产品购买了多少个?(复合Key做MR)
//SQL实现 select user,sku,count(*) from test group by user,sku //MapReduce实现 map=function (){ emit({user:this.user,sku:this.sku},{count:1}) } reduce=function (key,values){ var cnt=0; values.forEach(function(val){ cnt+=val.count;}); return {"count":cnt}; } db.test.mapReduce(map,reduce,{out:"mr2"}) > db.mr2.find() { "_id" : { "user" : "Joe", "sku" : 0 }, "value" : { "count" : 103 } } { "_id" : { "user" : "Joe", "sku" : 1 }, "value" : { "count" : 106 } } { "_id" : { "user" : "Joe", "sku" : 2 }, "value" : { "count" : 102 } } { "_id" : { "user" : "Joe", "sku" : 3 }, "value" : { "count" : 105 } } { "_id" : { "user" : "Josh", "sku" : 4 }, "value" : { "count" : 87 } } { "_id" : { "user" : "Josh", "sku" : 5 }, "value" : { "count" : 107 }
每个用户购买的产品数量,总金额是多少?(复合Reduce结果处理)
//SQL实现 select user,count(sku),sum(price) from test group by user //MapReduce实现 map=function (){ emit(this.user,{amount:this.price,count:1}) } reduce=function (key,values){ var res={amount:0,count:0} values.forEach(function(val){ res.amount+=val.amount; res.count+=val.count }); return res; } db.test.mapReduce(map,reduce,{out:"mr3"}) > db.mr3.find() { "_id" : "Joe", "value" : { "amount" : 2053.8899999999994, "count" : 395 } } { "_id" : "Josh", "value" : { "amount" : 1409.2600000000002, "count" : 292 } } { "_id" : "Ken", "value" : { "amount" : 1547.7700000000002, "count" : 313 } }
在3中返回的amount的float精度需要改成两位小数,还需要得到商品的平均价格。(使用Finalize处理reduce结果集)
//SQL实现 select user,cast(sum(price) as decimal(10, 2)) as amount,count(sku) as [count], cast((sum(price)/count(sku)) as decimal(10,2)) as avgPrice from test group by user //MapReduce实现 map=function (){ emit(this.user,{amount:this.price,count:1,avgPrice:0}) } reduce=function (key,values){ var res={amount:0,count:0,avgPrice:0} values.forEach(function(val){ res.amount+=val.amount; res.count+=val.count }); return res; } finalizeFun=function (key,reduceResult){ reduceResult.amount=(reduceResult.amount).toFixed(2); reduceResult.avgPrice=(reduceResult.amount/reduceResult.count).toFixed(2); return reduceResult; } db.test.mapReduce(map,reduce,{out:"mr4",finalize:finalizeFun}) > db.mr4.find() { "_id" : "Joe", "value" : { "amount" : "2053.89", "count" : 395, "avgPrice" : "5.20" } } { "_id" : "Josh", "value" : { "amount" : "1409.26", "count" : 292, "avgPrice" : "4.83" } } { "_id" : "Ken", "value" : { "amount" : "1547.77", "count" : 313, "avgPrice" : "4.94" } }
统计单价大于6的SKU,每个用户的购买数量.(筛选数据子集做MR)
这个比较简单了,只需要将1.中调用MR时加上筛选查询即可,其它不变.db.test.mapReduce(map,reduce,{query:{price:{"$gt":6}},out:"mr5"})
总结
MongoDB中的MR工具非常强大,文中的例子只是基础实例.结合Sharding后,多服务器并行做数据集合处理,才能真正显现其能力.
- 相关工具:
目前mongodb 使用的工具最好用,且可以用的是 [NoSql Manager for MongoDB] - 发现的问题:
在Mapreduce 实践的过程中发现,如果某类记录,只有一条记录时,在mapreduce results 中,所展示的数据为Map的数据,并没有通过reduce汇总,只有当数据超过一条时,汇总数据才正确,需要具体开发的同学注意该问题
- 相关工具: