Spark on Supercomputers: A Few Notes

I've been working with Apache Spark quite a bit lately in an effort to bring it into the fold as a viable tool for solving some of the data-intensive problems encountered in supercomputing.  I've already added support for provisioning Spark clusters to a branch of the myHadoop framework I maintain so that Slurm, Torque, and SGE users can begin playing with it, and as a result of these efforts, I've discovering a number of interesting issues with Spark running on traditional supercomputers.

At this point in time, Spark is very rough around the edges.  The core implementation of resilient distributed datasets are all there and work wonderfully, but I've found that it doesn't take long to start discovering bugs and half-implemented features that can get very confusing very quickly.  Perhaps half of the problems I've faced are the result of the fact that I have been trying to run Spark in non-traditional ways (for example, over hosts' TCP over InfiniBand interfaces and with non-default config directories), and although the documentation claims to support all of the features necessary to make this possible, the reality is a bit different.

What follows are just some incoherent notes I've taken while porting Spark to the myHadoop framework.  Spark is rapidly developing and it is constantly improving, so I hope this post becomes outdated as the Spark developers make the framework more robust.

Control Script Problems

Hadoop and Spark both ship with "control scripts" or "cluster launch scripts" that facilitate the starting and stopping of the entire cluster of daemons.  At the highest level, this includes start-all.sh and stop-all.sh, which make calls to start-dfs.sh and start-yarn.sh (in Hadoop) and start-master.sh and start-slaves.sh.  In Hadoop, these scripts work wonderfully, but Spark's implementation of these control scripts is still quite immature because they carry implicit assumptions about users' Spark configurations.

Like Hadoop, Spark supports a spark-env.sh file (located in $SPARK_CONF_DIR) which defines environment variables for all of the remote Spark workers that are spawned across the cluster.  This file is an ideal place to put the following environment variable definitions:
  • SPARK_MASTER_IP - the default value for this is `hostname` which is generally not a great default on most clusters.  On Rocks, we append ".ibnet" to the hostname to get Spark to operate over the InfiniBand fabric.
  • SPARK_LOCAL_IP - again, ensure that this is set up to use the correct interface on the cluster.  We append .ibnet on Rocks.
  • SPARK_HOME, SPARK_PREFIX, and SPARK_CONF_DIR should also be defined here since spark-env.sh will usually override the variables defined by spark-config.sh (see below)
$SPARK_HOME/sbin/spark-config.sh is where much of the Spark control scripts' "intelligence" comes from as far as defining the environment variables that Spark needs to launch.  In particular, spark-config.sh defines the following variables before reading spark-env.sh:
  • SPARK_PREFIX
  • SPARK_HOME
  • SPARK_CONF_DIR
The problem is that spark-config.sh will stomp all over anything the user defines for the above variables, and since spark-config.sh is called from within all of the Spark control scripts (both evoked by the user and evoked by sub-processes on remote hosts during the daemon spawning process), trying to get Spark to use non-default values for SPARK_CONF_DIR (e.g., exactly what myHadoop does) gets to be tedious.

The Spark developers tried to work around this by having the control scripts call spark-env.sh after spark-config.sh, meaning you should be able to define your own SPARK_CONF_DIR in spark-env.sh.  Unfortunately, this mechanism of calling spark-env.sh after spark-config.sh appears as

. "$sbin/spark-config.sh"

if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
  . "${SPARK_CONF_DIR}/spark-env.sh"
fi

That is, spark-config.sh will stomp all over any user-specified SPARK_CONF_DIR, and then use the SPARK_CONF_DIR from spark-config.sh to look for spark-env.sh.  Thus, there is no actual way to get the Spark control scripts (as of version 0.9) to honor the user-specified SPARK_CONF_DIR.  It looks like the latest commits to Spark have started to address this, but a cursory glance over the newest control scripts suggests that this remains broken.

Anyway, as a result of this, myHadoop's Spark integration eschews the Spark control scripts and handles spawning the daemons more directly using the manual method of spawning slaves.  Doing this averts the following issues:
  1. start-slaves.sh can't find any slaves because it always looks for $SPARK_HOME/etc/slaves.  This can be worked around by passing SPARK_SLAVES=$SPARK_CONF_DIR/slaves to start-slaves.sh for a non-default SPARK_CONF_DIR.
  2. stop-master.sh doesn't do anything useful because you still need to kill -9 the master process by hand.  Not sure why this is the case.

Deciphering Spark Errors

Here are various cryptic stack traces I've encountered while working on Spark.  I kept these mostly for myself, but I've started meeting people that hit the same problems and thought it might be worthwhile to share the diagnoses I've found.

In general, Spark seems to work best when used conservatively, but when you start doing things that do not strictly fall within the anticipated use case, things break in strange ways.  For example, if you try to write an RDD with an empty element (e.g., a text file with empty lines), you would get this really crazy error that does not actually say anything meaningful:

14/04/30 16:23:07 ERROR Executor: Exception in task ID 19
scala.MatchError: 0 (of class java.lang.Integer)
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)

I filed a bug report about this particular problem and the issue has been fixed, but it's just one of those edge cases where Spark will fail catastrophically (I had to look at the source code to figure out what "scala.MatchError" meant).  Usually you wouldn't be operating on empty data sets, but I discovered this error when I was trying to quickly determine if my Spark slaves were communicating with my master correctly by issuing

file = sc.textFile('hdfs://master.ibnet0/user/glock/input.txt')
file.saveAsTextFile('hdfs://master.ibnet0/user/glock/output')

That is, simply reading in a file and writing it back out with pyspark would cause catastrophic failure.  This is what I meant when I say Spark's still rough around the edges.

Here are a few more errors I've encountered.  They're not problems with Spark, but the stack traces and exceptions thrown can be a little mysterious.  I'm pasting it all here for the sake of googlers who may run into these same problems.

If you try to use Spark built against Hadoop 2 with a Hadoop 1 HDFS, you'll get this IPC error:

>>> file.saveAsTextFile('hdfs://s12ib:54310/user/glock/gutenberg.out')
Traceback (most recent call last):
  File "", line 1, in 
  File "/home/glock/apps/spark-0.9.0/python/pyspark/rdd.py", line 682, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "/home/glock/apps/spark-0.9.0/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/home/glock/apps/spark-0.9.0/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
     at org.apache.hadoop.ipc.Client.call(Client.java:1070)
     at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
     at $Proxy7.getProtocolVersion(Unknown Source)
     at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
     at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)


If your Pythons aren't all the same version across the nodes when Spark workers are instantiated, you might get a cryptic error like this when trying to call the count() method on an RDD:

14/04/30 16:15:11 ERROR Executor: Exception in task ID 12
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 182, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 171, in _batched
    for item in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", line 493, in func
    if acc is None:
TypeError: an integer is required

     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:131)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)


If you try to write an RDD to a file with mismatched Python versions, or if you were using anything earlier than Python 2.7 (e.g., 2.6) with any Spark version earlier than 1.0.0, you'd see this:

14/04/30 17:53:20 WARN scheduler.TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 677, in func
    if not isinstance(x, basestring):
SystemError: unknown opcode

     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:131)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)


If your HDFS URI is wrong, the error message actually makes sense.  It is buried quite deeply though.

Traceback (most recent call last):
  File "", line 1, in 
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 682, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: java.lang.IllegalArgumentException: java.net.UnknownHostException: s12ib.ibnet0
     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:418)
     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:231)
     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:139)
     at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:510)
     at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:453)
     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
     at org.apache.hadoop.mapred.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:193)
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:685)
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)
     at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)
     at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:355)
     at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:27)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
     at java.lang.reflect.Method.invoke(Method.java:597)
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
     at py4j.Gateway.invoke(Gateway.java:259)
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
     at py4j.commands.CallCommand.execute(CallCommand.java:79)
     at py4j.GatewayConnection.run(GatewayConnection.java:207)
     at java.lang.Thread.run(Thread.java:619)
Caused by: java.net.UnknownHostException: s12ib.ibnet0
     ... 29 more