spark1.3.1分布式配置

1 download spark-bin

apache官网下载spark1.3.1-bin-hadoop-2.6.tgz地址如下:

http://spark.apache.org/downloads.html

2 配置spark集群

承接上一篇文章,我们将spark文件放在/usr/local/bigdata/spark下。
并且在.bashrc中配置spark的环境变量。

1
2
3
#spark
export SPARK_HOME=/usr/local/bigdata/spark
export PATH=$SPARK_HOME/bin:$PATH

然后安装scala,因为spark使用scala写的所以最好还是安装一下。

下载地址:”http://www.scala-lang.org/download/2.11.7.html

安装完成后得到scala的安装路径。我是在ubuntu下安装的deb,路径应该是/usr/share/scala这个地方。

然后在spark配置文件conf/下操作:

1
2
cp spark-env.sh.template spark-env.sh
vim spark-env.sh

在里面添加如下内容:

1
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export SCALA_HOME=/usr/share/scala
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/local/bigdata/hadoop/etc/hadoop
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_HOME/lib/native"

然后创建slave文件:

1
2
cp slaves.template slaves
vim slaves

去掉里面的localhost,加上自己的slave节点ip.slave.我这里的slave是在hosts里面改过的,所以可以直接写别名的。

然后将spark文件分发到其他节点,同时其他节点也要配置好环境,和scala

然后启动spart

1
SPARK_HOME/sbin/start-all.sh

3 测试

启动hadoopspark后,在master节点输入jps:

1
~ [10:59:16]
bd$ jps
6047 Jps
5013 SecondaryNameNode
4803 NameNode
5472 Master
5157 ResourceManager
5771 SparkSubmit
 ~ [10:59:23]

slave节点上:

1
 ~ [10:43:23]
bd$ jps                                   
5843 Jps
5527 Worker
5210 DataNode
5339 NodeManager
 ~ [11:01:22]

然后启动bin/spark-shell
进行测试:

1
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val b = a.map(x => x*2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

scala> b.collect
15/12/14 22:57:02 INFO spark.SparkContext: Starting job: collect at <console>:26
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:26) with 1 output partitions (allowLocal=false)
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:26)
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Missing parents: List()
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at map at <console>:23), which has no missing parents
15/12/14 22:57:05 INFO storage.MemoryStore: ensureFreeSpace(1792) called with curMem=0, maxMem=280248975
15/12/14 22:57:05 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1792.0 B, free 267.3 MB)
15/12/14 22:57:05 INFO storage.MemoryStore: ensureFreeSpace(1291) called with curMem=1792, maxMem=280248975
15/12/14 22:57:05 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1291.0 B, free 267.3 MB)
15/12/14 22:57:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60692 (size: 1291.0 B, free: 267.3 MB)
15/12/14 22:57:05 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/12/14 22:57:05 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/12/14 22:57:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[1] at map at <console>:23)
15/12/14 22:57:05 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/12/14 22:57:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1317 bytes)
15/12/14 22:57:05 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/12/14 22:57:06 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 643 bytes result sent to driver
15/12/14 22:57:06 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:26) finished in 0.459 s
15/12/14 22:57:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 364 ms on localhost (1/1)
15/12/14 22:57:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/12/14 22:57:06 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:26, took 3.878411 s
res0: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala> (1 to 10,3)
res1: (scala.collection.immutable.Range.Inclusive, Int) = (Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),3)

scala> exit