博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
向HBase中导入数据2:使用MapReduce从HDFS或本地文件中读取数据并写入HBase(只使用Map逐条查询)
阅读量:2490 次
发布时间:2019-05-11

本文共 7334 字,大约阅读时间需要 24 分钟。

一、在HBase中创建空表

二、准备好要写入HBase的文件(可能存在HDFS或者本地,需要修改输入文件路径,HDFS使用hdfs://开头,本地文件使用file://开头)

例如我有这样一份文件:

其保存在HDFS上

三、检查能否调用hadoop读取该文件

package cn.edu.shu.ces.chenjie.tianyi.hadoop;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class HadoopConnectTest{	public static class Mapper1 extends Mapper
{ private Text outKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String values[] = value.toString().split("\\|"); String screen = values[0]; String model = values[1]; String userID = values[2]; String country = values[3]; String province = values[4]; String city = values[5]; String network = values[6]; String time = values[7]; String ymd = time.split(" ")[0]; outKey.set(userID + "-" + ymd); context.write(outKey, value); } } public static class Reducer1 extends Reducer
{ private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { String userID = key.toString(); System.out.println("------------------------------------>用户:" + userID); for(Text value : values) { System.out.println("\t" + value); } //context.write(outKey, outValue); } } private static final String HDFS = "hdfs://192.168.1.112:9000";//HDFS路径 private static final String INPATH = HDFS + "/clientdata/clientdata20170616.txt";//输入文件路径 //clientdata20170616.txt private static final String OUTPATH = HDFS + "/out";//输出文件路径 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount
"); System.exit(2); } conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); @SuppressWarnings("deprecation") Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(HadoopConnectTest.class); job.setMapperClass(Mapper1.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer1.class); //为job设置Reduce类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; } public static void main(String[] args) { try { new HadoopConnectTest().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } }}

示例输出:

所需jar包(有些不必要)

四、编写MapReduce程序,其中map操作读取文件,产生key为ImmutableBytesWritable类型,值为Put类型的输出;

【因为Put对象可以直接存进hbase,所以不需要Reduce操作】

package cn.edu.shu.ces.chenjie.tianyi.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;/*** * 使用MapReduce向HBase中导入数据 * @author chenjie *  */public class HadoopConnectTest3{	public static class HBaseHFileMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) { String value_str = value.toString(); //将Text型数据转为字符串: //1080x1920|Vivo X9|a8622b8a26ae679f0be82135f6529902|中国|湖南|益阳|wifi|2017-12-31 11:55:58 String values[] = value_str.split("\\|"); //按|隔开 if (values == null || values.length < 8) return; //过滤掉长度不符合要求的记录 String userID = values[2]; //取出用户:a8622b8a26ae679f0be82135f6529902 String time = values[7]; //取出时间:2017-12-31 11:55:58 String ymd = time.split(" ")[0]; //得到年月日:2017-12-31 String hms = time.split(" ")[1] + ":000"; //得到时分秒:11:55:58 String rowkey = userID + "-" + ymd; //使用用户ID-年月日作为HBase表中的行键 Put p1 = new Put(Bytes.toBytes(rowkey)); //使用行键新建Put对象 p1.addColumn(Bytes.toBytes("d"), Bytes.toBytes(hms), Bytes.toBytes(value_str)); //向put中增加一列,列族为d,列名为时分秒,值为原字符串 if (!p1.isEmpty()) //如果Put对象不为空(有列) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); //新建一个ImmutableBytesWritable对象 ib.set(Bytes.toBytes("clientdata_test5")); //将ImmutableBytesWritable对象的值设为表名 try { context.write(ib, p1); //尝试将此键值对作为mapper的输出 } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static final String HDFS = "hdfs://192.168.1.112:9000";// HDFS路径 private static final String INPATH = HDFS + "/tmp/clientdata10.txt";// 输入文件路径 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); //任务的配置设置,configuration是一个任务的配置对象,封装了任务的配置信息 conf.set("hbase.zookeeper.quorum", "pc2:2181,pc3:2181,pc4:2181"); //设置zookeeper conf.set("hbase.rootdir", "hdfs://pc2:9000/hbase"); //设置hbase根目录 conf.set("zookeeper.znode.parent", "/hbase"); Job job = Job.getInstance(conf, "HFile bulk load test"); // 生成一个新的任务对象并 job.setJarByClass(HadoopConnectTest3.class); //设置driver类 job.setMapperClass(HBaseHFileMapper.class); // 设置任务的map类和,map类输出结果是ImmutableBytesWritable和put类型 TableMapReduceUtil.initTableReducerJob("clientdata_test5", null, job); // TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作,只需要简单的设置即可 //设置表名为clientdata_test5,reducer类为空,job为此前设置号的job job.setNumReduceTasks(0); // 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0 FileInputFormat.addInputPath(job, new Path(INPATH)); // 设置输入文件路径 return (job.waitForCompletion(true) ? 0 : -1); } public static void main(String[] args) { try { new HadoopConnectTest3().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } }}

五、运行截图

六、问题分析

相当于查询一条存一条,那么对于行键rowkey相同的记录不能做到批量插入;

要是能够先把rowkey相同的记录map一下,那么到达reduce时相同rowkey的记录会聚集起来,此时再使用批量插入效果应该比逐条无规则插入效果好

你可能感兴趣的文章
VNPY - CTA策略模块策略开发
查看>>
VNPY - 事件引擎
查看>>
MongoDB基本语法和操作入门
查看>>
学习笔记_vnpy实战培训day04_作业
查看>>
OCO订单(委托)
查看>>
学习笔记_vnpy实战培训day06
查看>>
回测引擎代码分析流程图
查看>>
Excel 如何制作时间轴
查看>>
股票网格交易策略
查看>>
matplotlib绘图跳过时间段的处理方案
查看>>
vnpy学习_04回测评价指标的缺陷
查看>>
ubuntu终端一次多条命令方法和区别
查看>>
python之偏函数
查看>>
vnpy学习_06回测结果可视化改进
查看>>
读书笔记_量化交易如何建立自己的算法交易01
查看>>
设计模式03_工厂
查看>>
设计模式04_抽象工厂
查看>>
设计模式05_单例
查看>>
设计模式06_原型
查看>>
设计模式07_建造者
查看>>