您的当前位置:首页实验3-MapReduce编程初级实践

实验3-MapReduce编程初级实践

2023-07-02 来源:爱问旅游网
实验3 MapReduce编程初级实践

1. 实验目的

1.通过实验掌握基本的MapReduce编程方法;

2.掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。

2. 实验平台

已经配置完成的Hadoop伪分布式环境。

3. 实验内容和要求

1.编程实现文件合并和去重操作

对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。

实验最终结果(合并的文件):

代码如下:

package com.Merge;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.; import org.apache.hadoop.mapreduce.lib.output.;

public class Merge {

public static class Map extends Mapper { private static Text text = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException { text = value;

context.write(text, new Text(\"\")); } }

public static class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, new Text(\"\")); } }

public static void main(String[] args) throws Exception { Configuration conf = new Configuration();

conf.set(\"fs.defaultFS\ String[] otherArgs = new String[] { \"input\ if (otherArgs.length != 2) {

System.err.println(\"Usage: Merge and duplicate removal \"); System.exit(2); }

Job job = Job.getInstance(conf, \"Merge and duplicate removal\"); job.setJarByClass(Merge.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); (job, new Path(otherArgs[0])); (job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1); } }

2. 编写程序实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

实验结果截图:

代码如下:

package com.MergeSort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.; import org.apache.hadoop.mapreduce.lib.output.;

public class MergeSort {

public static class Map extends

Mapper { private static IntWritable data = new IntWritable();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); data.set(Integer.parseInt(line));

context.write(data, new IntWritable(1)); } }

public static class Reduce extends

Reducer { private static IntWritable linenum = new IntWritable(1);

public void reduce(IntWritable key, Iterable values,

Context context) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(linenum, key);

linenum = new IntWritable(linenum.get() + 1); }

}

}

public static void main(String[] args) throws Exception { Configuration conf = new Configuration();

conf.set(\"fs.defaultFS\

String[] otherArgs = new String[] { \"input2\直接设置输入参数 */

if (otherArgs.length != 2) {

System.err.println(\"Usage: mergesort \"); System.exit(2); }

Job job = Job.getInstance(conf, \"mergesort\"); job.setJarByClass(MergeSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class);

job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); (job, new Path(otherArgs[0])); (job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

} }

3. 对给定的表格进行信息挖掘

下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

实验最后结果截图如下:

代码如下:

package com.join;

import java.io.IOException; import java.util.*;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.; import org.apache.hadoop.mapreduce.lib.output.;

public class STjoin {

public static int time = 0;

public static class Map extends Mapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String child_name = new String(); String parent_name = new String(); String relation_type = new String(); String line = value.toString(); int i = 0;

while (line.charAt(i) != ' ') { i++; }

String[] values = { line.substring(0, i), line.substring(i + 1) }; if (values[0].compareTo(\"child\") != 0) { child_name = values[0]; parent_name = values[1]; relation_type = \"1\";

context.write(new Text(values[1]), new Text(relation_type + \"+\"

+ child_name + \"+\" + parent_name)); relation_type = \"2\";

context.write(new Text(values[0]), new Text(relation_type + \"+\"

+ child_name + \"+\" + parent_name)); } } }

public static class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (time == 0) {

context.write(new Text(\"grand_child\"), new Text(\"grand_parent\"));

time++; }

int grand_child_num = 0;

String grand_child[] = new String[10]; int grand_parent_num = 0;

String grand_parent[] = new String[10]; Iterator ite = values.iterator(); while (ite.hasNext()) {

String record = ite.next().toString(); int len = record.length(); int i = 2; if (len == 0) continue;

char relation_type = record.charAt(0); String child_name = new String();

String parent_name = new String(); while (record.charAt(i) != '+') {

child_name = child_name + record.charAt(i); i++; }

i = i + 1;

while (i < len) {

parent_name = parent_name + record.charAt(i); i++; }

if (relation_type == '1') {

grand_child[grand_child_num] = child_name; grand_child_num++; } else {

grand_parent[grand_parent_num] = parent_name; grand_parent_num++; } }

if (grand_parent_num != 0 && grand_child_num != 0) { for (int m = 0; m < grand_child_num; m++) { for (int n = 0; n < grand_parent_num; n++) {

context.write(new Text(grand_child[m]), new Text( grand_parent[n])); } } } } }

public static void main(String[] args) throws Exception { Configuration conf = new Configuration();

conf.set(\"fs.defaultFS\

String[] otherArgs = new String[] { \"input3\ if (otherArgs.length != 2) {

System.err.println(\"Usage: Single Table Join \"); System.exit(2); }

Job job = Job.getInstance(conf, \"Single table join \"); job.setJarByClass(STjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); (job, new Path(otherArgs[0])); (job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

} }

4. 实验报告 《云计算》 实验报告 题目: MapReduce编程初级实践 姓名 包生友 日期:2016/12/20 实验环境:机房的虚拟机上配置好的环境 解决问题的思路:根据老师给的代码进行操作 实验内容与完成情况:已完成,与同学商量后仍有部分代码尚未知道其作用所在 出现的问题:执行之后,出现未找到main函数情况,再次执行会报错,说文件已经存在。 解决方案(列出遇到的问题和解决办法,列出没有解决的问题): 问题:1.执行之后,出现未找到main函数情况 2. 再次执行会报错,说文件已经存在。 解决办法:删除输出文件即可(程序执行时输出文件不能存在)

5. 实验总结

通过本次实验,使我掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。短暂的

云计算课程实验到此结束,到我知道对云计算的学习是没有尽头的。

因篇幅问题不能全部显示,请点此查看更多更全内容