How to use PySpark Streaming with Google Colaboratory
Streaming is an extension of the PySpark core API that allows us to process data from streaming or static data sources. Whilst there are myriad articles online about how to use Streaming for PySpark with databricks and other configurations, I found that attempting to set this up using Google Colab was rather difficult, as there was a lack of a simple demonstration to help me. Therefore, the objective of this article is to walk you through a simple example to process static CSV files using Google Colab and the PySpark Streaming API.
By following these simple steps, you will be able to drag and drop CSV files into a pre-defined “streaming” directory, and the Streaming API will automatically pick them up and process them. So let’s begin.
As per usual, install PySpark in a new notebook using Colab’s bash command helper “!”:
!pip install pyspark
and then instantiate the spark session like this, for example:
from pyspark.sql import SparkSessionspark = SparkSession.builder.master(“local”).appName(“Streaming”).config(‘spark.ui.port’, ‘4050’).getOrCreate()
With Streaming, the structure of the input data is required to be known beforehand. Therefore, I have imported the Struct Type and Struct Field so that I can pre-define my schema, based on the CSV data I am using. In this example, I have obtained some people data from https://raw.githubusercontent.com/lawlesst/vivo-sample-data/master/data/csv/people.csv. You can use any data for your example.
from pyspark.sql.types import *schema = StructType([
StructField(“person_ID”,IntegerType(),True), StructField(“name”,StringType(),True), StructField(“first”,StringType(),True),
StructField(“last”, StringType(), True),
StructField(“middle”, StringType(), True),
StructField(“email”, StringType(), True),
StructField(“phone”, StringType(), True),
StructField(“fax”, StringType(), True),
StructField(“title”, StringType(), True)])
What I would suggest doing is breaking the data file up into a number of files (i.e. poeople1.csv, people2.csv and people3.csv), so that you can better simulate PySpark’s ability to pick up files as soon as they are placed in a streaming directory.
Now let’s create a new directory within your session that will act as the “streaming” directory. Any CSV file that is dropped into this directory will then be automatically imported into a data frame via the Streaming API.
Now we should be able to execute the following code which identifies the just-created “streaming” directory and the type of files that we will be processing (i.e. CSV).
people_df = spark.readStream.format(“csv”).schema(schema).option(“header”, True).load(“streaming”)
The above code has assigned the data frame people_df as streaming. You can check this by viewing the isStreaming property. If you try to .show() the contents of this data frame, you will receive an error, as an outputStream is required for this.
print(people_df.isStreaming)
The next step is to define a transformation on the data frame. In this case, I am simply selecting all fields and rows in the CSV data file that will soon be placed into the “streaming” directory. Here is where you can later get creative by using filters or more advanced functionality such as windowing operations.
When started, the results of each select transformation will be written (writeStream) to a new file and path which I have named “results” in json format (you may also use csv). This is effectively doing our .show() action. In addition, the query waits patiently (.awaitTermination()) for the next CSV file to be placed into the streaming directory.
results_df = people_df.select(“*”)query = (results_df.writeStream
.format(“json”)
.queryName(“selectTable”)
.option(“checkpointLocation”, “checkpoint”)
.option(“path”, “results”)
.outputMode(“complete”)
.start()
.awaitTermination()
)
As previously mentioned, I have 3 different people.csv files that I will be moving one at a time, into the streaming directory. As I move them (dragging and dropping), you will notice that the results directory path is populated with a new file, containing the result of the select transformation previously defined.
You will find your results in the “results” folder, created at run-time.
Although I haven’t gone into all the details of the PySpark Streaming API, this article has presented a simple example of how to get streaming working with Google Colab through the use of static CSV files. Now it’s up to you to further investigate its power and potential uses.