1、MapReduce实现矩阵相乘
一. 准备数据
#!/bin/bash
if [ $# -ne 3 ]
then
echo "there must be 3 arguments to generate the two matries file!"
exit 1
fi
cat /dev/null > M_$1_$2
cat /dev/null > N_$2_$3
for i in `seq 1 $1`
do
for j in `seq 1 $2`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>M_$1_$2
done
done
echo "we have built the matrix file M"
for i in `seq 1 $2`
do
for j in ` seq 1 $3`
do
s=$((RANDOM%100))
echo -e "$i,$j\t$s" >>N_$2_$3
done
done
echo "we have built the matrix file N"
用一下脚本语言准备数组数据
M_3_2:
1,1 81
1,2 13
2,1 38
2,2 46
3,1 0
3,2 2
N_2_4:
1,1 99
1,2 38
1,3 34
1,4 19
2,1 21
2,2 4
2,3 36
2,4 64
二. 计算
public class Matrix {
private static class MatrixMapper extends
Mapper<LongWritable, Text, Text, Text> {
private static int colN = 0;
private static int rowM = 0;
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
colN = configuration.getInt("colN", 0);
rowM = configuration.getInt("rowM", 0);
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String[] strings = value.toString().split(",");
int i = Integer.parseInt(strings[0]);
String[] ser = strings[1].split("\t");
int j = Integer.parseInt(ser[0]);
int val = Integer.parseInt(ser[1]);
if (fileName.startsWith("M")) {
for (int count = 1; count <= colN; count++) {
context.write(new Text(i + "," + count), new Text("M," + j
+ "," + val + ""));
}
} else {
for (int count = 1; count <= rowM; count++) {
context.write(new Text(count + "," + j), new Text("N," + i
+ "," + val + ""));
}
}
}
}
private static class MatrixReduce extends
Reducer<Text, Text, Text, IntWritable> {
private static int rowM = 0;
@Override
protected void setup(
Reducer<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
rowM = configuration.getInt("rowM", 0);
}
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sumValue = 0;
int[] m_Arr = new int[rowM + 1];
int[] n_Arr = new int[rowM + 1];
for (Text value : values) {
String string = value.toString();
String[] strings = string.split(",");
if (strings[0].equals("M")) {
m_Arr[Integer.parseInt(strings[1])] = Integer
.parseInt(strings[2]);
} else {
n_Arr[Integer.parseInt(strings[1])] = Integer
.parseInt(strings[2]);
}
}
for (int i = 1; i < rowM + 1; i++) {
sumValue += m_Arr[i] * n_Arr[i];
}
context.write(key, new IntWritable(sumValue));
}
}
public static void main(String[] args) throws IllegalArgumentException,
IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HadoopConfig.getConfiguration();
configuration.setInt("colN", 4);
configuration.setInt("rowN", 2);
configuration.setInt("colM", 2);
configuration.setInt("rowM", 3);
Job job = Job.getInstance(configuration, "矩阵相乘");
job.setJarByClass(Sort.class);
job.setMapperClass(MatrixMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(MatrixReduce.class);
FileInputFormat.addInputPath(job, new Path("/matrix"));
FileOutputFormat.setOutputPath(job, new Path("/matrixOutput"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
三. 结果
1,1 8292
1,2 3130
1,3 3222
1,4 2371
2,1 4728
2,2 1628
2,3 2948
2,4 3666
3,1 42
3,2 8
3,3 72
3,4 128
2、MapReduce实现倒排索引
一、准备数据
file1:
one fish
two bird
two monkey
file2:
two peach
three watermelon
二、计算
public class InvertIndex {
private static class InvertIndexMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().toString();
String[] words = value.toString().split(" ");
for (String string : words) {
context.write(new Text(string), new Text(fileName + "#" + key.toString()));
}
}
}
private static class InvertIndexReduce extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder stringBuilder = new StringBuilder();
for (Text text : values) {
stringBuilder.append(text.toString()).append(";");
}
context.write(key, new Text(stringBuilder.toString()));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException{
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "倒排索引");
job.setJarByClass(InvertIndex.class);
job.setMapperClass(InvertIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(InvertIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/ouput"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
三、结果
bird hdfs://127.0.0.1:8020/data/file1#9;
fish hdfs://127.0.0.1:8020/data/file1#0;
monkey hdfs://127.0.0.1:8020/data/file1#18;
one hdfs://127.0.0.1:8020/data/file1#0;
peach hdfs://127.0.0.1:8020/data/file2#0;
three hdfs://127.0.0.1:8020/data/file2#10;
two hdfs://127.0.0.1:8020/data/file2#0;hdfs://127.0.0.1:8020/data/file1#18;hdfs://127.0.0.1:8020/data/file1#9;
watermelon hdfs://127.0.0.1:8020/data/file2#10;
3、MapReduce实现复杂倒排索引
一、准备数据
file1:
one fish
two bird
two monkey
file2:
two peach
three watermelon
二、计算
public class ComplexInvertIndex {
private static class FileNameRecordReader extends RecordReader<Text, Text> {
LineRecordReader lineRecordReader = new LineRecordReader();
String fileName;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineRecordReader.initialize(split, context);
fileName = ((FileSplit) split).getPath().getName();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineRecordReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
private static class FileNameInputFormat extends
FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
fileNameRecordReader.initialize(split, context);
return fileNameRecordReader;
}
}
private static class ComplexInvertIndexMapper extends
Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value,
Mapper<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
for (String string : strs) {
context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
}
}
}
private static class ComplexInvertIndexCombiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key,new IntWritable(sum));
System.out.println(key.toString() + sum +"");
}
}
//把key的前面字段聚合,排序
private static class InvertIndexPartitioner extends
HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String[] strs = key.toString().split("#");
return super.getPartition(new Text(strs[0]), value, numReduceTasks);
}
}
private static class ComplexInvertIndexReduce extends
Reducer<Text, IntWritable, Text, Text> {
static Map<String, String> map = new HashMap<String, String>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] strings = key.toString().split("#");
String word = strings[0];
String doc = strings[1];
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
if( map.get(word) == null ){
map.put(word," ("+doc+","+sum+") ");
}else{
map.put(word,map.get(word)+" ("+doc+","+sum+") ");
}
}
@Override
protected void cleanup(
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
for(String key:map.keySet()){
context.write(new Text(key), new Text(map.get(key)));
}
}
}
public static void main(String[] args)throws IOException,
ClassNotFoundException, InterruptedException{
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "复杂倒排索引");
job.setJarByClass(ComplexInvertIndex.class);
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(ComplexInvertIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(ComplexInvertIndexCombiner.class);
job.setReducerClass(ComplexInvertIndexReduce.class);
job.setPartitionerClass(InvertIndexPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
三、结果查看
monkey (file1,1)
bird (file1,1)
fish (file1,1)
one (file1,1)
peach (file2,1)
watermelon (file2,1)
three (file2,1)
two (file1,2) (file2,1)