本文共 7334 字,大约阅读时间需要 24 分钟。
一、在HBase中创建空表
二、准备好要写入HBase的文件(可能存在HDFS或者本地,需要修改输入文件路径,HDFS使用hdfs://开头,本地文件使用file://开头)
例如我有这样一份文件:
其保存在HDFS上
三、检查能否调用hadoop读取该文件
package cn.edu.shu.ces.chenjie.tianyi.hadoop;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class HadoopConnectTest{ public static class Mapper1 extends Mapper{ private Text outKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String values[] = value.toString().split("\\|"); String screen = values[0]; String model = values[1]; String userID = values[2]; String country = values[3]; String province = values[4]; String city = values[5]; String network = values[6]; String time = values[7]; String ymd = time.split(" ")[0]; outKey.set(userID + "-" + ymd); context.write(outKey, value); } } public static class Reducer1 extends Reducer { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String userID = key.toString(); System.out.println("------------------------------------>用户:" + userID); for(Text value : values) { System.out.println("\t" + value); } //context.write(outKey, outValue); } } private static final String HDFS = "hdfs://192.168.1.112:9000";//HDFS路径 private static final String INPATH = HDFS + "/clientdata/clientdata20170616.txt";//输入文件路径 //clientdata20170616.txt private static final String OUTPATH = HDFS + "/out";//输出文件路径 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); @SuppressWarnings("deprecation") Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(HadoopConnectTest.class); job.setMapperClass(Mapper1.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer1.class); //为job设置Reduce类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; } public static void main(String[] args) { try { new HadoopConnectTest().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } }}
示例输出:
所需jar包(有些不必要)
四、编写MapReduce程序,其中map操作读取文件,产生key为ImmutableBytesWritable类型,值为Put类型的输出;
【因为Put对象可以直接存进hbase,所以不需要Reduce操作】
package cn.edu.shu.ces.chenjie.tianyi.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;/*** * 使用MapReduce向HBase中导入数据 * @author chenjie * */public class HadoopConnectTest3{ public static class HBaseHFileMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) { String value_str = value.toString(); //将Text型数据转为字符串: //1080x1920|Vivo X9|a8622b8a26ae679f0be82135f6529902|中国|湖南|益阳|wifi|2017-12-31 11:55:58 String values[] = value_str.split("\\|"); //按|隔开 if (values == null || values.length < 8) return; //过滤掉长度不符合要求的记录 String userID = values[2]; //取出用户:a8622b8a26ae679f0be82135f6529902 String time = values[7]; //取出时间:2017-12-31 11:55:58 String ymd = time.split(" ")[0]; //得到年月日:2017-12-31 String hms = time.split(" ")[1] + ":000"; //得到时分秒:11:55:58 String rowkey = userID + "-" + ymd; //使用用户ID-年月日作为HBase表中的行键 Put p1 = new Put(Bytes.toBytes(rowkey)); //使用行键新建Put对象 p1.addColumn(Bytes.toBytes("d"), Bytes.toBytes(hms), Bytes.toBytes(value_str)); //向put中增加一列,列族为d,列名为时分秒,值为原字符串 if (!p1.isEmpty()) //如果Put对象不为空(有列) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); //新建一个ImmutableBytesWritable对象 ib.set(Bytes.toBytes("clientdata_test5")); //将ImmutableBytesWritable对象的值设为表名 try { context.write(ib, p1); //尝试将此键值对作为mapper的输出 } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static final String HDFS = "hdfs://192.168.1.112:9000";// HDFS路径 private static final String INPATH = HDFS + "/tmp/clientdata10.txt";// 输入文件路径 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); //任务的配置设置,configuration是一个任务的配置对象,封装了任务的配置信息 conf.set("hbase.zookeeper.quorum", "pc2:2181,pc3:2181,pc4:2181"); //设置zookeeper conf.set("hbase.rootdir", "hdfs://pc2:9000/hbase"); //设置hbase根目录 conf.set("zookeeper.znode.parent", "/hbase"); Job job = Job.getInstance(conf, "HFile bulk load test"); // 生成一个新的任务对象并 job.setJarByClass(HadoopConnectTest3.class); //设置driver类 job.setMapperClass(HBaseHFileMapper.class); // 设置任务的map类和,map类输出结果是ImmutableBytesWritable和put类型 TableMapReduceUtil.initTableReducerJob("clientdata_test5", null, job); // TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作,只需要简单的设置即可 //设置表名为clientdata_test5,reducer类为空,job为此前设置号的job job.setNumReduceTasks(0); // 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0 FileInputFormat.addInputPath(job, new Path(INPATH)); // 设置输入文件路径 return (job.waitForCompletion(true) ? 0 : -1); } public static void main(String[] args) { try { new HadoopConnectTest3().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } }}
五、运行截图
六、问题分析
相当于查询一条存一条,那么对于行键rowkey相同的记录不能做到批量插入;
要是能够先把rowkey相同的记录map一下,那么到达reduce时相同rowkey的记录会聚集起来,此时再使用批量插入效果应该比逐条无规则插入效果好