整合tachyon,hadoop和spark

整合tachyon,hadoop和spark

承接上述三篇文章,现在终于要整合了,在整合中还是出现了sparktachyon不匹配的问题,官方给出的匹配列表如下:

Spark Version Tachyon Version
1.0.x and Below v0.4.1
1.1.x v0.5.0
1.2.x v0.5.0
1.3.x v0.5.0
1.4.x v0.6.4
1.5.x v0.7.1

因为我用的是spark1.3.1所以又重新编译了tachyon0.5.0过程是一样的,接下来默认的tachyon版本就是
0.5.0了。而且也不用创建tachyon/conf下的core-site.xml文件了。

1 tachyon与hdfs

其实在官方文档上说的很清楚,只不过,域名不好记,我就单独记录我的配置过程。
因为我的hadoop版本是2.x.x为了不出错还是要在hadoop/etc/hadoop/core-site.xml下加入:

1
2
3
4
5
6
7
8
<property>
<name>fs.tachyon.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
<property>
<name>fs.tachyon-ft.impl</name>
<value>tachyon.hadoop.TFSFT</value>
</property>

然后将tachyon下的jar包复制到hadoop/lib下。jar包的路径是tachyon-0.5.0/core/target下的tachyon-0.5.0-jar-with-dependencies.jar

然后hadoop的就配置完成了,可以测试一个小例子:

要先导入tachyon的包。这里安装教程通过maven导入:

1
2
3
4
5
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package edu.hhu.innerac.tachyontest;

import java.io.IOException;
import tachyon.TachyonURI;
import tachyon.client.InStream;
import tachyon.client.OutStream;
import tachyon.client.ReadType;
import tachyon.client.TachyonFS;
import tachyon.client.TachyonFile;
import tachyon.client.WriteType;

public class Demo {
public void test() throws IOException{
String masterurl = "tachyon://master:19998";
TachyonFS tachyonFS = TachyonFS.get(masterurl);
TachyonURI filepaths = new TachyonURI("/tmp/test/newFile2");
if(tachyonFS.exist(filepaths)){
tachyonFS.delete(filepaths, true);
}
tachyonFS.createFile(filepaths,10240);
TachyonFile tachyonFile = tachyonFS.getFile(filepaths);
OutStream o = tachyonFile.getOutStream(WriteType.TRY_CACHE);
for(int i = 0; i < 30; i ++){
o.write(Integer.toString(i).getBytes());
}
o.close();
InStream in = tachyonFile.getInStream(ReadType.CACHE);
byte[] bytes = new byte[100];
in.read(bytes);
System.out.println(new String(bytes));
in.close();
tachyonFS.close();
}

public static void main(String[] args) throws IOException {
Demo demo = new Demo();
demo.test();
}
}

运行成功就没错啦。

2 tachyon与spark整合

问题就是出在这里,因为用的spark版本不匹配导致了各种问题。

同样要将那个jar包放到$SPARK_CLASSPATH的目录下。我第一次放了,但是没起作用,所以我在spark-evn.sh里是这样写的:

1
2
export SPARK_CLASSPATH=$SPARK_HOME/lib
export SPARK_CLASSPATH=$SPARK_HOME/lib/tachyon-0.5.0-jar-with-dependencies.jar:$SPARK_CLASSPATH

在之前我已经把jar包放到lib目录下了。

写好之后启动spark``hadooptachyon,然后打开spark-shell
假设存在hdfs://master:9000/hhu/input/f2其实是上次放过去的。
然后执行下面命令:

1
2
3
val rdd = sc.textFile("tachyon://master:19998/hhu/input/f2")
val double = rdd.map(line=>line +line)
double.saveAsTextFile("tachyon://master:19998/output3")

这是最后输出的一点日志。证明写入成功了。

15/12/31 14:13:20 INFO : FileDoesNotExistException(message:Failed to getClientFileInfo: /output3/part-00001 does not exist)/output3/part-00001
15/12/31 14:13:20 INFO : File does not exist: tachyon://master:19998/output3/part-00001
15/12/31 14:13:20 INFO : rename(tachyon://master:19998/output3/_temporary/0/task_201512311413_0000_m_000001/part-00001, tachyon://master:19998/output3/part-00001)
15/12/31 14:13:20 INFO : delete(tachyon://master:19998/output3/_temporary, true)
15/12/31 14:13:20 INFO : create(tachyon://master:19998/output3/_SUCCESS, rw-r–r–, true, 65536, 1, 33554432, null)

3 附带几个HelloWorld

3.1 hadoop WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package edu.hhu.innerac.hadooptest;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {

String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException
{

int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}

3.2 Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package edu.hhu.innerac.sparktest;  

import java.util.HashMap;
import java.util.Map;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel;

public class Test {
static final String USER = "innerac";
public static void main(String[] args) throws Exception {
System.setProperty("user.name", USER); // 设置访问Spark使用的用户名
System.setProperty("HADOOP_USER_NAME", USER); // 设置访问Hadoop使用的用户名
Map<String,String> envs = new HashMap<String, String>();
envs.put("HADOOP_USER_NAME", USER); // 为Spark环境中服务于本App的各个Executor程序设置访问Hadoop使用的用户名
System.setProperty("spark.executor.memory", "512m"); // 为Spark环境中服务于本App的各个Executor程序设置使用内存量的上限
// 以下构造sc对象的构造方法各参数意义依次为:
// Spark Master的地址;
// App的名称;
// Spark Worker的部署位置;
// 需要提供给本App的各个Executor程序下载的jar包的路径列表,这些jar包将出现在Executor程序的类路径中;
// 传递给本App的各个Executor程序的环境信息。
JavaSparkContext sc = new JavaSparkContext("spark://master:7077", "Spark App 0", "/usr/local/bigdata/spark", new String[0], envs);
// String file = "hdfs://master:9000/output2/part-00000";
String file = "hdfs://master:9000/hhu/input/f2";
JavaRDD<String> data = sc.textFile(file, 4).cache().persist(StorageLevel.OFF_HEAP());
System.out.println(data.count());
}
}

3.3 tachyon

tachyon的例子就是上面那个.