大数据基础课程设计报告一、项目简介:使用hadoop中的hive、mapreduce以及HBASE对网上的一个搜狗五百万的数进行了一个比较实际的数据分析。
搜狗五百万数据,是经过处理后的搜狗搜索引擎生产数据,具有真实性,大数据性,能够较好的满足分布式计算应用开发课程设计的数据要求。
搜狗数据的数据格式为:访问时间\t 用户ID\t[查询词]\t 该URL 在返回结果中的排名\t 用户点击的顺序号\t 用户点击的URL。
其中,用户ID 是根据用户使用浏览器访问搜索引擎时的Cookie 信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID。
二、操作要求1.将原始数据加载到HDFS平台。
2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。
3.将处理后的数据加载到HDFS平台。
4.以下操作分别通过MR和Hive实现。
●查询总条数●非空查询条数●无重复总条数●独立UID总数●查询频度排名(频度最高的前50词)●查询次数大于2次的用户总数●查询次数大于2次的用户占比●Rank在10以内的点击次数占比●直接输入URL查询的比例●查询搜索过”仙剑奇侠传“的uid,并且次数大于35.将4每步骤生成的结果保存到HDFS中。
6.将5生成的文件通过Java API方式导入到HBase(一张表)。
7.通过HBase shell命令查询6导出的结果。
三、实验流程1.将原始数据加载到HDFS平台2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段(1)编写1个脚本sogou-log-extend.sh,其中sogou-log-extend.sh的内容为:处理脚本文件:bash sogou-log-extend.sh sogou.500w.utf8 sogou.500w.utf8.ext结果为:3.将处理后的数据加载到HDFS平台hadoop fs -put sogou.500w.utf8.ext /4.以下操作分别通过MR和Hive实现Ⅰ.hive实现1.查看数据库:show databases;2.创建数据库: create database sogou;3.使用数据库: use sogou;4.查看所有表:show tables;5.创建sougou表:Create table sogou(time string,uuid string,name string,num1 int,num2 int,url string) Row format delimited fields terminated by '\t';6.将本地数据导入到Hive表里:Load data local inpath'/root/sogou.500w.utf8' into table sogou;7.查看表信息:desc sogou;(1)查询总条数select count(*) from sogou;(2)非空查询条数select count(*) from sogou where name is not null and name !='';(3)无重复总条数select count(*) from (select * from sogou group bytime,num1,num2,uuid,name,url having count(*)=1) a;(4)独立UID总数select count(distinct uuid) from sogou;(5)查询频度排名(频度最高的前50词)select name,count(*) as pd from sogou group by name order by pd desc limit 50;(6)查询次数大于2次的用户总数select count(a.uuid) from (select uuid,count(*) as cnt from sogou group by uuid having cnt > 2) a;(7)查询次数大于2次的用户占比select count(*) from (select uuid,count(*) as cnt from sogou group by uuid having cnt > 2) a;(8)Rank在10以内的点击次数占比select count(*) from sogou where num1<11;(9)直接输入URL查询的比例select count(*) from sogou where url like '%www%';(10)查询搜索过”仙剑奇侠传“的uid,并且次数大于3 select uuid ,count(*) as uu from sogou where name='仙剑奇侠传' groupby uuid having uu>3;Ⅱ.MapReduce实现(import的各种包省略)(1)查询总条数public class MRCountAll {public static Integer i = 0;public static boolean flag = true;public static class CountAllMap extends Mapper<Object, Text, T ext, Text> { @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {i++;}}public static void runcount(String Inputpath, String Outpath) { Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = null;try {job = Job.getInstance(conf, "count");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}job.setJarByClass(MRCountAll.class);job.setMapperClass(CountAllMap.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {FileInputFormat.addInputPath(job, new Path(Inputpath));} catch (IllegalArgumentException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}FileOutputFormat.setOutputPath(job, new Path(Outpath));try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static void main(String[] args) throws Exception {runcount("/sogou/data/sogou.500w.utf8", "/sogou/data/CountAll");System.out.println("总条数: " + i);}}(2)非空查询条数public class CountNotNull {public static String Str = "";public static int i = 0;public static boolean flag = true;public static class wyMap extends Mapper<Object, Text, Text, IntWritable> { @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {String[] values = value.toString().split("\t");if (!values[2].equals(null) && values[2] != "") {context.write(new Text(values[1]), new IntWritable(1));i++;}}}public static void run(String inputPath, String outputPath) {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = null;try {job = Job.getInstance(conf, "countnotnull");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}assert job != null;job.setJarByClass(CountNotNull.class);job.setMapperClass(wyMap.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);try {FileInputFormat.addInputPath(job, new Path(inputPath));} catch (IllegalArgumentException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}try {FileOutputFormat.setOutputPath(job, new Path(outputPath));job.waitForCompletion(true);} catch (ClassNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {run("/sogou/data/sogou.500w.utf8", "/sogou/data/CountNotNull");System.out.println("非空条数: " + i);}}(3)无重复总条数public class CountNotRepeat {public static int i = 0;public static class NotRepeatMap extends Mapper<Object , Text , Text, Text>{ @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {String text = value.toString();String[] values = text.split("\t");String time = values[0];String uid = values[1];String name = values[2];String url = values[5];context.write(new Text(time+uid+name+url), new Text("1"));}}public static class NotRepeatReduc extends Reducer<Text , IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException {i++;context.write(new Text(key.toString()),new IntWritable(i));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = null;try {job = Job.getInstance(conf, "countnotnull");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}assert job != null;job.setJarByClass(CountNotRepeat.class);job.setMapperClass(NotRepeatMap.class);job.setReducerClass(NotRepeatReduc.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));} catch (IllegalArgumentException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}try {FileOutputFormat.setOutputPath(job, newPath("/sogou/data/CountNotRepeat"));job.waitForCompletion(true);} catch (ClassNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("无重复总条数为: " + i);}}(4)独立UID总数public class CountNotMoreUid {public static int i = 0;public static class UidMap extends Mapper<Object , Text , Text, T ext>{ @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {String text = value.toString();String[] values = text.split("\t");String uid = values[1];context.write(new Text(uid), new Text("1"));}}public static class UidReduc extends Reducer<Text , IntWritable, Text, IntWritable>{ @Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException {i++;context.write(new Text(key.toString()),new IntWritable(i));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = null;try {job = Job.getInstance(conf, "countnotnull");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}assert job != null;job.setJarByClass(CountNotNull.class);job.setMapperClass(UidMap.class);job.setReducerClass(UidReduc.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));} catch (IllegalArgumentException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}try {FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotMoreUid"));job.waitForCompletion(true);} catch (ClassNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("独立UID条数: " + i);}}(5)查询频度排名(频度最高的前50词)public class CountTop50 {public static class TopMapper extends Mapper<LongWritable, Text, Text, LongWritable>{Text text =new T ext();@Overrideprotected void map(LongWritable key, T ext value,Context context)throws IOException, InterruptedException {String[] line= value.toString().split("\t");String keys = line[2];text.set(keys);context.write(text,new LongWritable(1));}}public static class TopReducer extends Reducer< Text,LongWritable, Text, LongWritable>{Text text = new Text();TreeMap<Integer,String > map = new TreeMap<Integer,String>();@Overrideprotected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {int sum=0;//key出现次数for (LongWritable ltext : value) {sum+=ltext.get();}map.put(sum,key.toString());//去前50条数据if(map.size()>50){map.remove(map.firstKey());}}@Overrideprotected void cleanup(Context context)throws IOException, InterruptedException {for(Integer count:map.keySet()){context.write(new Text(map.get(count)), new LongWritable(count));}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = Job.getInstance(conf, "count");job.setJarByClass(CountT op50.class);job.setJobName("Five");job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setMapperClass(T opMapper.class);job.setReducerClass(T opReducer.class);FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountTop50"));job.waitForCompletion(true);}}(6)查询次数大于2次的用户总数public class CountQueriesGreater2 {public static int total = 0;public static class MyMaper extends Mapper<Object, Text, T ext, IntWritable> { protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {String[] str = value.toString().split("\t");Text word;IntWritable one = new IntWritable(1);word = new Text(str[1]);context.write(word, one);}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text arg0, Iterable<IntWritable> arg1,Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {// arg0是一个单词arg1是对应的次数int sum = 0;for (IntWritable i : arg1) {sum += i.get();}if(sum>2){total=total+1;}//arg2.write(arg0, new IntWritable(sum));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");// 1.实例化一个JobJob job = Job.getInstance(conf, "six");// 2.设置mapper类job.setMapperClass(MyMaper.class);// 3.设置Combiner类不是必须的// job.setCombinerClass(MyReducer.class);// 4.设置Reducer类job.setReducerClass(MyReducer.class);// 5.设置输出key的数据类型job.setOutputKeyClass(Text.class);// 6.设置输出value的数据类型job.setOutputValueClass(IntWritable.class);// 设置通过哪个类查找job的Jar包job.setJarByClass(CountQueriesGreater2.class);// 7.设置输入路径FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));// 8.设置输出路径FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreater2"));// 9.执行该作业job.waitForCompletion(true);System.out.println("查询次数大于2次的用户总数:" + total + "条");}}(7)查询次数大于2次的用户占比public class CountQueriesGreaterPro {public static int total1 = 0;public static int total2 = 0;public static class MyMaper extends Mapper<Object, Text, T ext, IntWritable> { @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {total2++;String[] str = value.toString().split("\t");Text word;IntWritable one = new IntWritable(1);word = new Text(str[1]);context.write(word, one);// 执行完毕后就是一个单词对应一个value(1)}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text arg0, Iterable<IntWritable> arg1,Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {// arg0是一个单词arg1是对应的次数int sum = 0;for (IntWritable i : arg1) {sum += i.get();}if(sum>2){total1++;}arg2.write(arg0, new IntWritable(sum));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {System.out.println("seven begin");Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");// 1.实例化一个JobJob job = Job.getInstance(conf, "seven");// 2.设置mapper类job.setMapperClass(MyMaper.class);// 3.设置Combiner类不是必须的// job.setCombinerClass(MyReducer.class);// 4.设置Reducer类job.setReducerClass(MyReducer.class);// 5.设置输出key的数据类型job.setOutputKeyClass(Text.class);// 6.设置输出value的数据类型job.setOutputValueClass(IntWritable.class);// 设置通过哪个类查找job的Jar包job.setJarByClass(CountQueriesGreaterPro.class);// 7.设置输入路径FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));// 8.设置输出路径FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreaterPro"));// 9.执行该作业job.waitForCompletion(true);System.out.println("total1="+total1+"\ttotal2="+total2);float percentage = (float)total1/(float)total2;System.out.println("查询次数大于2次的用户占比为:" + percentage*100+"%");System.out.println("over");}}(8)Rank在10以内的点击次数占比public class CountRank {public static int sum1 = 0;public static int sum2 = 0;public static class MyMapper extends Mapper<Object, Text, Text, Text> { @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {sum2++;String[] str = value.toString().split("\t");int rank = Integer.parseInt(str[3]);if(rank<11){sum1=sum1+1;}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = Job.getInstance(conf, "eight");job.setMapperClass(MyMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setJarByClass(CountRank.class);FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountRank"));job.waitForCompletion(true);System.out.println("sum1="+sum1+"\tsum2="+sum2);float percentage = (float)sum1/(float)sum2;System.out.println("Rank在10以内的点击次数占比:" +percentage*100+"%");}}(9)直接输入URL查询的比例public class CountURL {public static int sum1 = 0;public static int sum2 = 0;public static class MyMapper extends Mapper<Object, Text, Text, Text> { @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {String[] str = value.toString().split("\t");Pattern p = pile("www");Matcher matcher = p.matcher(str[2]);matcher.find();try {if(matcher.group()!=null)sum1++;sum2++;} catch (Exception e) {sum2++;}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = Job.getInstance(conf, "nine");job.setMapperClass(MyMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setJarByClass(CountURL.class);FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountURL"));job.waitForCompletion(true);System.out.println("sum1="+sum1+"\tsum2="+sum2);float percentage = (float)sum1/(float)sum2;System.out.println("直接用url'%www%'查询的用户占比:" +percentage*100+"%");}}(10)查询搜索过”仙剑奇侠传“的uid,并且次数大于3 public class CountUidGreater3 {public static String Str="";public static int i=0;public static class Map extends Mapper<Object, Text, Text, IntWritable>{ @Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {String []values=value.toString().split("\t");String pattern="仙剑奇侠传";if(values[2].equals(pattern)){context.write(new Text(values[1]), new IntWritable(1));}}}public static class Reduce extends Reducer<Text, IntWritable, T ext, IntWritable>{ @Overrideprotected void reduce(Text key, Iterable<IntWritable> value,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum=0;for(IntWritable v:value){sum=sum+v.get();}if(sum>3){Str=Str+key.toString()+"\n";i++;}}}public static void main(String[] args) {Configuration conf=new Configuration();conf.set("fs.defaultFS", "hdfs://10.49.47.20:9000");Job job = null;try {job = Job.getInstance(conf, "count");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}job.setJarByClass(CountUidGreater3.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);try {FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));} catch (IllegalArgumentException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}try {FileOutputFormat.setOutputPath(job, newPath("/sogou/data/CountUidGreater3"));job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("i: "+i);System.out.println(Str);}}5.将4每步骤生成的结果保存到HDFS中使用INSERT OVERWRITE DIRECTORY可完成操作例如:将5生成的文件通过Java API方式导入到HBase(一张表)6.将中5生成的文件通过Java API方式导入到HBase(一张表)public class HBaseImport{// reduce输出的表名private static String tableName = "test";// 初始化连接static Configuration conf = null;static {conf = HBaseConfiguration.create();conf.set("hbase.rootdir", "hdfs://10.49.47.20:9000/hbase");conf.set("hbase.master", "hdfs://10.49.47.20:60000");conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");conf.set(T ableOutputFormat.OUTPUT_TABLE, tableName);}public static class BatchMapper extendsMapper<LongWritable, T ext, LongWritable, Text> {protected void map(LongWritable key, T ext value,Mapper<LongWritable, Text, LongWritable, Text>.Contextcontext)throws IOException, InterruptedException {String line = value.toString();Text v2s = new Text();v2s.set(line);context.write(key, v2s);}}public static class BatchReducer extendsT ableReducer<LongWritable, Text, NullWritable> {private String family = "info";@Overrideprotected void reduce(LongWritable arg0,Iterable<Text> v2s,Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)throws IOException, InterruptedException {for (Text v2 : v2s) {String[] splited = v2.toString().split("\t");String rowKey = splited[0];Put put = new Put(rowKey.getBytes());put.add(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());context.write(NullWritable.get(), put);}}}public static void imputil(String str) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(conf, HBaseImport.class.getSimpleName());T ableMapReduceUtil.addDependencyJars(job);job.setJarByClass(HBaseImport.class);FileInputFormat.setInputPaths(job,str);job.setInputFormatClass(TextInputFormat.class);job.setMapperClass(BatchMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(BatchReducer.class);job.setOutputFormatClass(TableOutputFormat.class);job.waitForCompletion(true);}public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {String[] str={"hdfs://10.49.47.20:9000/sogou/data/1/000000_0","hdfs://10.49.47.20:9000/sogou/data/2/000000_0","hdfs://10.49.47.20:9000/sogou/data/3/000000_0","hdfs://10.49.47.20:9000/sogou/data/4/000000_0","hdfs://10.49.47.20:9000/sogou/data/5/000000_0","hdfs://10.49.47.20:9000/sogou/data/6/000000_0","hdfs://10.49.47.20:9000/sogou/data/9/000000_0","hdfs://10.49.47.20:9000/sogou/data/10/000000_0"};for (String stri:str){imputil(stri);}}}7.将通过HBase shell命令查询6导出的结果scan ‘test’;四、学习心得现在互联网的快速发展带来了数据快速增加,海量数据的存储已经不是一台机器所能处理的问题了。