本地及MapReduce作业读取SquenceFile

版权申明:转载请注明出处。
文章来源:大数据随笔

1.SquenceFile简介

(1)SequenceFile是一个由二进制序列化过的key/value的字节流组成的存储文件。
(2)SequenceFile可通过fileWriter.append(key,value)来完成新记录的添加操作。
(3)在存储结构上,SequenceFile主要由一个Header后跟多条Record组成。
(4) Header主要包含了Key classname,Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。
(5)每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、Key的长度、Key值和Value值,并且Value值的结构取决于该记录是否被压缩。
sequencefile
说到SquenceFile就不得不提MapFile,MapFile是一种排序了的SquenceFile,其由data和index两部分组成。. index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。

2.本地读取SequenceFile

这种读取方式一般不太常见,偶尔用于数据抽样查看。抽取少量数据在本地环境下分析。下面的例子中,假如存储的SequenceFile的key的类型为NullWritable,value的类型为ByteWritable,且value是对象Feature序列化后的结果。下面的代码演示从这样一个sequencefile中反序列化出Feature。核心代码段如下:

//设置本地运行的参数
Configuration conf=new Configuration();
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
//声明一个SequenceFile.Reader
SequenceFile.Reader reader = null;
FileSystem fs = FileSystem.get(conf);
Path path = new Path(filename);
reader = new SequenceFile.Reader(fs,path,conf);
//获取key,value对象
NullWritable key=(NullWritable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value=(BytesWritable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
//读取。这里key的存储为空,只解析value的内容
while(reader.next(key, value)){
byte[] bytes = value.copyBytes();
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Feature[] f = (Feature[])ois.readObject();
}

3.MapReduce作业中读取SequenceFile

SequenceFile就是为Hadoop设计的一种平面结构的存储形式,所以MapReduce作业中天然的支持读取这种结构。与读入文本形式存储的文件不同的是作业的输入类型需指定为SequenceFileInputFormat.class,除此之外几乎无差别。

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SFInput {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SFinput.class);

job.setMapperClass(SFMapper.class);
job.setReducerClass(SFReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VLongWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VLongWritable.class);

job.setInputFormatClass(SequenceFileInputFormat.class);

SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
public static class SFMapper extends Mapper<Text, VLongWritable,Text, VLongWritable> {
public void map(Text key, VLongWritable value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}

}
public static class SFReducer extends Reducer<Text, VLongWritable,Text, VLongWritable>{
@Override
protected void reduce(Text key, Iterable v2s,Context context)
throws IOException, InterruptedException {
for(VLongWritable vl : v2s){
context.write(key, vl);
}
}
}
}

为您推荐

发表评论

电子邮件地址不会被公开。 必填项已用*标注