spark与hadoop的分布式文件缓存

版权申明:转载请注明出处。
文章来源:大数据随笔

1.简介

分布式计算框架中,各种配置文件、jar包、数据文件等都是通过分布式文件缓存进行下发的。这种方式将作业所需的资源文件下发到执行作业的各个节点上,作业执行过程中文件不再改动。
关于hadoop的分布式缓存可以参见我的另外一篇文章Hadoop分布式缓存(DistributedCache),这篇文章基于hadoop2.x之前的版本写的,文章中提到的添加分布式缓存的方式已被标注为过时,新的添加分布式缓存的方法被封装进了Job类。

2.使用场景

(1)Hadoop中map-join
(2)字典文件的分发
(3)全局配置文件分发

3.spark分布式文件缓存源码解析

spark
spark中addFile实现如下:

def addFile(path: String, recursive: Boolean): Unit = {
    val uri = new Path(path).toUri
    val schemeCorrectedPath = uri.getScheme match {
      case null | "local" => new File(path).getCanonicalFile.toURI.toString
      case _ => path
    }

    val hadoopPath = new Path(schemeCorrectedPath)
    val scheme = new URI(schemeCorrectedPath).getScheme
    if (!Array("http", "https", "ftp").contains(scheme)) {
      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
      val isDir = fs.getFileStatus(hadoopPath).isDirectory
      if (!isLocal && scheme == "file" && isDir) {
        throw new SparkException(s"addFile does not support local directories when not running " +
          "local mode.")
      }
      if (!recursive && isDir) {
        throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
          "turned on.")
      }
    } else {
      // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
      Utils.validateURL(uri)
    }

    val key = if (!isLocal && scheme == "file") {
      env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    } else {
      schemeCorrectedPath
    }
    val timestamp = System.currentTimeMillis
    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
      logInfo(s"Added file $path at $key with timestamp $timestamp")
      // Fetch the file locally so that closures which are run on the driver can still use the
      // SparkFiles API to access files.
      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
      postEnvironmentUpdate()
    }
  }

(1)可以看到spark支持包括local,hdfs,viewfs,http,ftp等多种sheme类型的文件。
(2)当recursive设置为true时可以缓存目录。
(3)除ftp,http,https三种scheme外,其余路径均使用hadoop文件系统表示
(4)一种特殊的情况是,作业运行模式为非local,但是文件的scheme为file,此时表示将driver上的本地文件分发到excutor上。
(5)所有缓存的文件记录在一个map中,key为文件路径,value为时间戳。使用map的putIfAbsent方法,返回先前的value,若先前的value为空则使用Utils.fetchFile方法将文件拷贝到各个节点。
文件缓存完毕该怎么使用呢?spark中使用SparkFiles来获取已经缓存的文件。源码比较简单:

object SparkFiles {
  /**
   * Get the absolute path of a file added through `SparkContext.addFile()`.
   */
  def get(filename: String): String =
    new File(getRootDirectory(), filename).getAbsolutePath()
  /**
   * Get the root directory that contains files added through `SparkContext.addFile()`.
   */
  def getRootDirectory(): String =
    SparkEnv.get.driverTmpDir.getOrElse(".")
}

缓存文件都放在作业执行的临时目录中,所以当作业执行完毕后,临时目录也会被删除。此处先从Spark运行环境参数中获取当前作业driver的临时目录,然后根据用户指定的文件名称获取文件,返回的是文件在本地的绝对路径。
这里需要注意的是,在excutor上也可以通过SparkFiles.get()方法获取缓存的文件。

4.Hadoop分布式文件缓存源码解析

Hadoop 2.x中添加分布式文件缓存的方式与以前版本稍有不同,主要体现在将添加分布式缓存的操作放在了Job中,添加之前先判断Job的状态是否为DEFINE,若不是会抛出异常。

public void addCacheFile(URI uri) {
        this.ensureState(Job.JobState.DEFINE);
        DistributedCache.addCacheFile(uri, this.conf);
    }

在DistributedCache类中,实现如下:

  public static void addCacheFile(URI uri, Configuration conf) {
        String files = conf.get("mapreduce.job.cache.files");
        conf.set("mapreduce.job.cache.files", files == null?uri.toString():files + "," + uri.toString());
    }

使用时则只需和读本地文件一样读取就行。代码如下:

    static Map<String,String> movies=new HashMap<String,String>();
        public void setup(Context context) {            
            try {
                FileReader reader = new FileReader("movies.dat");
                BufferedReader br = new BufferedReader(reader);
                String s1 = null;
                while ((s1 = br.readLine()) != null)
                {
                    System.out.println(s1);
                    String[] splits= s1.split("::");                    
                    String movieId=splits[0];
                    String movieName =splits[1];
                    movies.put(movieId, movieName);                 
                }
                br.close();
                reader.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

为您推荐

发表评论

电子邮件地址不会被公开。 必填项已用*标注