最新消息:深度思考

本地及MapReduce作业读取SquenceFile

hadoop liuxuecheng 5260浏览 0评论

版权申明:转载请注明出处。
文章来源:大数据随笔

1.SquenceFile简介

(1)SequenceFile是一个由二进制序列化过的key/value的字节流组成的存储文件。
(2)SequenceFile可通过fileWriter.append(key,value)来完成新记录的添加操作。
(3)在存储结构上,SequenceFile主要由一个Header后跟多条Record组成。
(4) Header主要包含了Key classname,Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。
(5)每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、Key的长度、Key值和Value值,并且Value值的结构取决于该记录是否被压缩。
sequencefile
说到SquenceFile就不得不提MapFile,MapFile是一种排序了的SquenceFile,其由data和index两部分组成。. index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。

2.本地读取SequenceFile

这种读取方式一般不太常见,偶尔用于数据抽样查看。抽取少量数据在本地环境下分析。下面的例子中,假如存储的SequenceFile的key的类型为NullWritable,value的类型为ByteWritable,且value是对象Feature序列化后的结果。下面的代码演示从这样一个sequencefile中反序列化出Feature。核心代码段如下:

//设置本地运行的参数
Configuration conf=new Configuration();  
conf.set("mapred.job.tracker", "local");  
conf.set("fs.default.name", "file:///");  
//声明一个SequenceFile.Reader
SequenceFile.Reader reader = null;
FileSystem fs = FileSystem.get(conf);
Path path = new Path(filename);
reader = new SequenceFile.Reader(fs,path,conf);
//获取key,value对象
NullWritable key=(NullWritable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value=(BytesWritable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
//读取。这里key的存储为空,只解析value的内容
while(reader.next(key, value)){
    byte[] bytes = value.copyBytes();
    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
    ObjectInputStream ois = new ObjectInputStream(bis);
    Feature[] f = (Feature[])ois.readObject();
}

3.MapReduce作业中读取SequenceFile

SequenceFile就是为Hadoop设计的一种平面结构的存储形式,所以MapReduce作业中天然的支持读取这种结构。与读入文本形式存储的文件不同的是作业的输入类型需指定为SequenceFileInputFormat.class,除此之外几乎无差别。

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SFInput {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SFinput.class);

        job.setMapperClass(SFMapper.class);
        job.setReducerClass(SFReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(VLongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VLongWritable.class);

        job.setInputFormatClass(SequenceFileInputFormat.class);

        SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
    public static class SFMapper extends Mapper<Text, VLongWritable,Text, VLongWritable> {
        public void map(Text key, VLongWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }

    }
    public static class SFReducer extends Reducer<Text, VLongWritable,Text, VLongWritable>{
        @Override
        protected void reduce(Text key, Iterable<VLongWritable> v2s,Context context)
                throws IOException, InterruptedException {
            for(VLongWritable vl : v2s){
                context.write(key, vl);
            }
        }
    }
}

转载请注明:大数据随笔 » 本地及MapReduce作业读取SquenceFile

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址