整合tachyon,hadoop和spark 承接上述三篇文章,现在终于要整合了,在整合中还是出现了spark
与tachyon
不匹配的问题,官方给出的匹配列表如下:
Spark Version Tachyon Version 1.0.x and Below v0.4.1 1.1.x v0.5.0 1.2.x v0.5.0 1.3.x v0.5.0 1.4.x v0.6.4 1.5.x v0.7.1
因为我用的是spark1.3.1
所以又重新编译了tachyon0.5.0
过程是一样的,接下来默认的tachyon
版本就是0.5.0
了。而且也不用创建tachyon/conf
下的core-site.xml
文件了。
1 tachyon与hdfs 其实在官方文档上说的很清楚,只不过,域名不好记,我就单独记录我的配置过程。 因为我的hadoop
版本是2.x.x
为了不出错还是要在hadoop/etc/hadoop/core-site.xml
下加入:1 2 3 4 5 6 7 8 <property > <name > fs.tachyon.impl</name > <value > tachyon.hadoop.TFS</value > </property > <property > <name > fs.tachyon-ft.impl</name > <value > tachyon.hadoop.TFSFT</value > </property >
然后将tachyon
下的jar
包复制到hadoop/lib
下。jar
包的路径是tachyon-0.5.0/core/target
下的tachyon-0.5.0-jar-with-dependencies.jar
然后hadoop
的就配置完成了,可以测试一个小例子:
要先导入tachyon
的包。这里安装教程通过maven
导入:
1 2 3 4 5 <dependency > <groupId > org.tachyonproject</groupId > <artifactId > tachyon-client</artifactId > <version > 0.5.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package edu.hhu.innerac.tachyontest;import java.io.IOException;import tachyon.TachyonURI;import tachyon.client.InStream;import tachyon.client.OutStream;import tachyon.client.ReadType;import tachyon.client.TachyonFS;import tachyon.client.TachyonFile;import tachyon.client.WriteType;public class Demo { public void test () throws IOException { String masterurl = "tachyon://master:19998" ; TachyonFS tachyonFS = TachyonFS.get(masterurl); TachyonURI filepaths = new TachyonURI("/tmp/test/newFile2" ); if (tachyonFS.exist(filepaths)){ tachyonFS.delete(filepaths, true ); } tachyonFS.createFile(filepaths,10240 ); TachyonFile tachyonFile = tachyonFS.getFile(filepaths); OutStream o = tachyonFile.getOutStream(WriteType.TRY_CACHE); for (int i = 0 ; i < 30 ; i ++){ o.write(Integer.toString(i).getBytes()); } o.close(); InStream in = tachyonFile.getInStream(ReadType.CACHE); byte [] bytes = new byte [100 ]; in.read(bytes); System.out.println(new String(bytes)); in.close(); tachyonFS.close(); } public static void main (String[] args) throws IOException { Demo demo = new Demo(); demo.test(); } }
运行成功就没错啦。
2 tachyon与spark整合 问题就是出在这里,因为用的spark
版本不匹配导致了各种问题。
同样要将那个jar
包放到$SPARK_CLASSPATH
的目录下。我第一次放了,但是没起作用,所以我在spark-evn.sh
里是这样写的:1 2 export SPARK_CLASSPATH=$SPARK_HOME /libexport SPARK_CLASSPATH=$SPARK_HOME /lib/tachyon-0.5 .0 -jar-with-dependencies.jar:$SPARK_CLASSPATH
在之前我已经把jar
包放到lib
目录下了。
写好之后启动spark``hadoop
和tachyon
,然后打开spark-shell
假设存在hdfs://master:9000/hhu/input/f2
其实是上次放过去的。 然后执行下面命令:1 2 3 val rdd = sc.textFile("tachyon://master:19998/hhu/input/f2" )val double = rdd.map(line=>line +line)double.saveAsTextFile("tachyon://master:19998/output3" )
这是最后输出的一点日志。证明写入成功了。
15/12/31 14:13:20 INFO : FileDoesNotExistException(message:Failed to getClientFileInfo: /output3/part-00001 does not exist)/output3/part-00001 15/12/31 14:13:20 INFO : File does not exist: tachyon://master:19998/output3/part-00001 15/12/31 14:13:20 INFO : rename(tachyon://master:19998/output3/_temporary/0/task_201512311413_0000_m_000001/part-00001, tachyon://master:19998/output3/part-00001) 15/12/31 14:13:20 INFO : delete(tachyon://master:19998/output3/_temporary, true) 15/12/31 14:13:20 INFO : create(tachyon://master:19998/output3/_SUCCESS, rw-r–r–, true, 65536, 1, 33554432, null)
3 附带几个HelloWorld 3.1 hadoop WordCount 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package edu.hhu.innerac.hadooptest;import java.io.IOException;import java.util.*;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class WordCount { public static class Map extends MapReduceBase implements Mapper <LongWritable , Text , Text , IntWritable > { private final static IntWritable one = new IntWritable(1 ); private Text word = new Text(); public void map (LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer <Text , IntWritable , Text , IntWritable > { public void reduce (Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0 ; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main (String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount" ); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0 ])); FileOutputFormat.setOutputPath(conf, new Path(args[1 ])); JobClient.runJob(conf); } }
3.2 Spark 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package edu.hhu.innerac.sparktest; import java.util.HashMap;import java.util.Map;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.rdd.RDD;import org.apache.spark.storage.StorageLevel; public class Test { static final String USER = "innerac" ; public static void main (String[] args) throws Exception { System.setProperty("user.name" , USER); System.setProperty("HADOOP_USER_NAME" , USER); Map<String,String> envs = new HashMap<String, String>(); envs.put("HADOOP_USER_NAME" , USER); System.setProperty("spark.executor.memory" , "512m" ); JavaSparkContext sc = new JavaSparkContext("spark://master:7077" , "Spark App 0" , "/usr/local/bigdata/spark" , new String[0 ], envs); String file = "hdfs://master:9000/hhu/input/f2" ; JavaRDD<String> data = sc.textFile(file, 4 ).cache().persist(StorageLevel.OFF_HEAP()); System.out.println(data.count()); } }
3.3 tachyon tachyon
的例子就是上面那个.