Pyspark streaming from and to csv-file in Zeppelin: basic code example

Assumptions: zeppelin 10.0, and Spark 3.1.1. I assume Spark runs in one thread on a single machine (local) and Zeppelin runs on the same machine. The Spark-Home variable has been set accordingly (i.e. in zeppelin you specified it for example like this in a cell:  %spark.conf SPARK HOME $PATH_TO_WHERE_YOU_INSTALLED_SPARK )

This example demonstrates streaming from and to a csv-file. The important point here is that when specifying the paths of these files it requires a “file://” specifier, like this: “file:///some/path/input_file.csv”  This is something you don’t need to do, when writing a python script. Here is a code example:


from pyspark.sql.functions import *

inputPath = 'file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/tmp/'
streamingDF = (
    .option("maxFilesPerTrigger", 1)

query=streamingDF.writeStream.format("csv"). \
outputMode("append"). \
option("checkpointLocation","file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/checkpoint_zep/"). \
option("path", "file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/csv/"). \start()


Note: I manually created the specified folders “csv”, “tmp” and “checkpoint_zep” under the zeppelin-bin folder. I read in a csv-file from the “tmp”-folder and write output to “csv”. When reading and writing from and to files in Spark you must define a checkpoint-folder.

If you forget to use the “file://” specifier you might end up with a cryptic error message saying that the connection was refused:


Py4JJavaError: An error occurred while calling o225.start.
: Call From VirtualBox/ to localhost:8998 failed on connection exception: connection refused ; For more details see:
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Was this helpful?

1 / 0