在java程序中将spark的RDD缓存到tachyon中

之前做好了可以在spark中读取tachyon的配置,但是使用rdd.persist(StorageLevel.OFF_HEAP)就出错,经过不断的排错和寻找,终于解决了这个问题。并且也可以在java中去设置了,之前设置一直出错。

1 spark的路径配置

其实spark这个配置文件之前一直忘记配置:就是在spark/conf下的spark-defaults.conf.template首先执行:

1
cp spark-defaults.conf.template spark-defaults.conf

然后在里面加入:

1
spark.tachyonStore.url	tachyon://master:19998

其实还有一个属性park.tachyonStore.baseDir可以不加的,如果不加使用的是默认位置,其实是我加上之后就会报错,所以我才不加。

加好之后重启,然后运行spark-shell
执行下述步骤:

1
2
3
4
scala> import org.apache.spark.storage.StorageLevel
scala> val rdd = sc.textFile("tachyon://master:19998/hhu/input/f2")
scala> rdd.persist(StorageLevel.OFF_HEAP)
scala> rdd.count()

执行成功即可表示配置完成,可以在master:19999文件浏览查看到临时创建的目录,但是已经没有东西了,因为运行结束之后就会删除。

2 在java中使用

scala中使用的好方便,但是java中总是出错。而且又没几个人用java去写。所以之后自己接着查api,最后还是搞定了。

开始要在hadoop配置里面加上"fs.tachyon.impl", "tachyon.hadoop.TFS"

然后就是获取到RDD后,要先执行rdd.unpersist();然后才能重新设置。否则会报错。例子贴一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package edu.hhu.innerac.sparktest;  

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

public class Test {

static final String USER = "innerac";

public static void main(String[] args) throws Exception {

SparkConf conf = new SparkConf();

JavaSparkContext sc = new JavaSparkContext("spark://master:7077", "Spark App 0", conf);

sc.hadoopConfiguration().set("fs.tachyon.impl", "tachyon.hadoop.TFS");
// String file = "hdfs://master:9000/hhu/input/f2";
String file = "tachyon://master:19998/hhu/input/f2";

JavaRDD<String> rdd = null;

rdd = sc.textFile(file, 4).cache();

rdd.unpersist();
rdd.persist(StorageLevel.OFF_HEAP());
System.out.println(rdd.count());

}

}