Assumptions: zeppelin 10.0, and Spark 3.1.1. I assume Spark runs in one thread on a single machine (local) and Zeppelin runs on the same machine. The Spark-Home variable has been set accordingly (i.e. in zeppelin you specified it for example like this in a cell: %spark.conf SPARK HOME $PATH_TO_WHERE_YOU_INSTALLED_SPARK )
This example demonstrates streaming from and to a csv-file. The important point here is that when specifying the paths of these files it requires a “file://” specifier, like this: “file:///some/path/input_file.csv” This is something you don’t need to do, when writing a python script. Here is a code example:
%spark.ipyspark from pyspark.sql.functions import * inputPath = 'file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/tmp/' streamingDF = ( spark .readStream .option("maxFilesPerTrigger", 1) .csv(inputPath) ) query=streamingDF.writeStream.format("csv"). \ outputMode("append"). \ option("checkpointLocation","file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/checkpoint_zep/"). \ option("path", "file:///some/Path/zeppelin-0.10.0-bin-netinst/bin/csv/"). \start() query.awaitTermination(timeout=60)
Note: I manually created the specified folders “csv”, “tmp” and “checkpoint_zep” under the zeppelin-bin folder. I read in a csv-file from the “tmp”-folder and write output to “csv”. When reading and writing from and to files in Spark you must define a checkpoint-folder.
If you forget to use the “file://” specifier you might end up with a cryptic error message saying that the connection was refused:
Py4JJavaError: An error occurred while calling o225.start. : java.net.ConnectException: Call From VirtualBox/127.0.1.1 to localhost:8998 failed on connection exception: java.net.ConnectException: connection refused ; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Was this helpful?
2 / 0