1 download spark-bin
在apache
官网下载spark1.3.1-bin-hadoop-2.6.tgz
地址如下:
“http://spark.apache.org/downloads.html“
2 配置spark集群
承接上一篇文章,我们将spark
文件放在/usr/local/bigdata/spark
下。
并且在.bashrc
中配置spark
的环境变量。
1 | #spark |
然后安装scala
,因为spark
使用scala
写的所以最好还是安装一下。
下载地址:”http://www.scala-lang.org/download/2.11.7.html“
安装完成后得到scala
的安装路径。我是在ubuntu
下安装的deb
,路径应该是/usr/share/scala
这个地方。
然后在spark
配置文件conf/
下操作:1
2cp spark-env.sh.template spark-env.sh
vim spark-env.sh
在里面添加如下内容:1
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export SCALA_HOME=/usr/share/scala
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/local/bigdata/hadoop/etc/hadoop
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_HOME/lib/native"
然后创建slave
文件:1
2cp slaves.template slaves
vim slaves
去掉里面的localhost
,加上自己的slave
节点ip.slave
.我这里的slave
是在hosts
里面改过的,所以可以直接写别名的。
然后将spark
文件分发到其他节点,同时其他节点也要配置好环境,和scala
然后启动spart
1
SPARK_HOME/sbin/start-all.sh
3 测试
启动hadoop
和spark
后,在master
节点输入jps
:1
~ [10:59:16]
bd$ jps
6047 Jps
5013 SecondaryNameNode
4803 NameNode
5472 Master
5157 ResourceManager
5771 SparkSubmit
~ [10:59:23]
slave
节点上:1
~ [10:43:23]
bd$ jps
5843 Jps
5527 Worker
5210 DataNode
5339 NodeManager
~ [11:01:22]
然后启动bin/spark-shell
进行测试:1
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val b = a.map(x => x*2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23
scala> b.collect
15/12/14 22:57:02 INFO spark.SparkContext: Starting job: collect at <console>:26
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:26) with 1 output partitions (allowLocal=false)
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:26)
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Missing parents: List()
15/12/14 22:57:02 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at map at <console>:23), which has no missing parents
15/12/14 22:57:05 INFO storage.MemoryStore: ensureFreeSpace(1792) called with curMem=0, maxMem=280248975
15/12/14 22:57:05 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1792.0 B, free 267.3 MB)
15/12/14 22:57:05 INFO storage.MemoryStore: ensureFreeSpace(1291) called with curMem=1792, maxMem=280248975
15/12/14 22:57:05 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1291.0 B, free 267.3 MB)
15/12/14 22:57:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60692 (size: 1291.0 B, free: 267.3 MB)
15/12/14 22:57:05 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/12/14 22:57:05 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/12/14 22:57:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[1] at map at <console>:23)
15/12/14 22:57:05 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/12/14 22:57:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1317 bytes)
15/12/14 22:57:05 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/12/14 22:57:06 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 643 bytes result sent to driver
15/12/14 22:57:06 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:26) finished in 0.459 s
15/12/14 22:57:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 364 ms on localhost (1/1)
15/12/14 22:57:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/12/14 22:57:06 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:26, took 3.878411 s
res0: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
scala> (1 to 10,3)
res1: (scala.collection.immutable.Range.Inclusive, Int) = (Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),3)
scala> exit