Real Time Data Processing Using Spark Streaming

Objective

  1. They help capture real-time reactions, which can be used for various analytical purposes.
  2. Since it is near-real-time processing, they can be used for fraud detection or for initiating prompt responses as per the needs of the situation.
  3. They can be used for scaling up or scaling down hardware as per incoming load.
  • Creation of a Basic Spark Streaming Application
  • Different Output Modes
  • Triggers
  • Transformation
  • Joins with Streams
  • Window Functions

Prerequisites

  • AWS EC2 instance
  • NetCat Service
  • Basict understanding of AWS

Code Flow

1. Create a SparkSession:

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()

2. Read from source:

Lines = spark.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12345).load()

3. Start:

query = lines.writeStream \
.outputMode("append") \
.format("console") \
.start()

4. AwaitTermination:

query.awaitTermination()

Segment 1: A Spark Streaming Application

yum update -y
yum install -y nc
mkdir coding_labs
cd mkdir_labs
vi read_from_socket.py
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()

lines = spark \
.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12345) \
.load()

query = lines \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
nc -l 12345
Spark2-submit read_from_socket.py

Segment 2: Output Modes

  • Append : Only modified records are added to the sink.
  • Update : Only modified records are added to the sink.
  • Complete : All records are added to the sink.
  • Append Mode:
vi stream_output_modes.py
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()

spark.sparkContext.setLogLevel('WARN')

lines = spark \
.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12345) \
.load()

query = lines \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
nc -l 12345
Spark2-submit stream_output_modes.py
  • Complete Mode:
vi stream_output_modes.py
query = lines  \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
nc -l 12345
Spark2-submit stream_output_modes.py
  • Update Mode:
vi stream_output_modes.py
query = lines  \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
nc -l 12345
Spark2-submit stream_output_modes.py

Segment 3: Triggers

  • Trigger with 2 seconds:
vi triggers.py
query = lines  \
.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime = '2 seconds') \
.start()
nc -l 12345
Spark2-submit triggers.py
  • Trigger with ‘Once’:
query = lines  \
.writeStream \
.outputMode("update") \
.format("console") \
.trigger(once=True) \
.start()

Segment 4: Transformations

vi basic_transform.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

lines = spark \
.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12345) \
.load()

transformedDF = lines.filter(length(col("value"))>4)

query = transformedDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
nc -l 12345
Spark2-submit basic_transform.py

Segment 5: Joins with Streams

  • Inner Join : Will return records that have matching values in both tables. Outer Join
  • Left : Will return all records from the left DataFrame and matching records from the right DataFrame
  • Right : Will return all records from the right DataFrame and matching records from the left DataFrame
  • Full : Will return all records when there is a match in either the left or the right DataFrame.
  • Stream-Stream outer joins can be performed only using watermarks.
  • Stream-Static right outer or full outer join is not permitted.
  • Static-Stream left outer or full outer join is not permitted. The reason for the above restrictions is that we cannot have the entire data of a static DataFrame for a join as it could be a huge load on the system owing to the volume of the data.
  • Stream-Stream Join:
vi join_stream_stream.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

stream1 = spark \
.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12345) \
.load()

streamDF1 = stream1.selectExpr("value as player")

stream2 = spark \
.readStream \
.format("socket") \
.option("host","localhost") \
.option("port",12346) \
.load()

streamDF2 = stream2.selectExpr("value as person")

# Inner Join Example
joinedDF = streamDF1.join(streamDF2, expr("""player = person"""))

query = joinedDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
nc -l 12345
nc -l 12346
Spark2-submit join_stream_stream.py
  • Static-Stream Join:
vi join_static_stream.py
cat players.csv
hadoop fs -mkdir testdir
hadoop fs -put players.csv testdir/
joinedDF = streamDF.join(staticDF, expr("""player = name"""))

query = joinedDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
Spark2-submit join_stream_stream.py
joinedDF = streamDF.join(staticDF, expr("""player = name"""), "left_outer")

query = joinedDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

Segment 6: Windows in Spark Streaming

  • Event Time : It is the time when the record is generated at the source. It is generally represented as a column in the source data set.
  • Processing Time : It is the time when the record arrives at the Spark processing layer. The difference between the event time and the processing time is due to various reasons such as publishing failures, distributed system lags, network delays and other such latencies. A window is nothing but a collection of records over a specific time period. There are 2 types windows in Spark Streaming:
  • Tumbling Window : No two windows overlap.
  • Sliding Window : Windows may or may not overlap.
vi window_functions.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

mySchema = StructType().add("activity_type", "string").add("activity_time","timestamp").add("activity_count","integer")

lines = spark \
.readStream \
.schema(mySchema) \
.json("data/")

windowDF = lines.groupBy(window("activity_time","1 day","1 hour")).sum("activity_count").alias("Events_Sum").orderBy(asc("window"))

query = windowDF \
.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "False") \
.option("numRows",200) \
.start()

query.awaitTermination()
vi log.json
{"activity_type":"CPU","activity_time":"2020-08-15T05:34:45","activity_count":5},
{"activity_type":"Memory","activity_time":"2020-08-15T05:48:05","activity_count":2},
{"activity_type":"CPU","activity_time":"2020-08-15T05:54:41","activity_count":8},
{"activity_type":"IO","activity_time":"2020-08-15T06:32:45","activity_count":3},
{"activity_type":"Network","activity_time":"2020-08-15T06:46:05","activity_count":12},
{"activity_type":"CPU","activity_time":"2020-08-15T06:53:41","activity_count":16},
{"activity_type":"CPU","activity_time":"2020-08-15T09:30:45","activity_count":4},
{"activity_type":"IO","activity_time":"2020-08-15T09:48:05","activity_count":1},
{"activity_type":"Network","activity_time":"2020-08-15T10:44:41","activity_count":9},
{"activity_type":"CPU","activity_time":"2020-08-15T11:34:45","activity_count":5},
{"activity_type":"Network","activity_time":"2020-08-15T11:48:05","activity_count":3},
{"activity_type":"Memory","activity_time":"2020-08-17T20:54:41","activity_count":6},
{"activity_type":"CPU","activity_time":"2020-08-17T21:34:45","activity_count":9},
{"activity_type":"IO","activity_time":"2020-08-17T21:48:05","activity_count":3},
{"activity_type":"IO","activity_time":"2020-08-18T05:54:41","activity_count":6},
{"activity_type":"Network","activity_time":"2020-08-19T15:34:45","activity_count":5},
{"activity_type":"Memory","activity_time":"2020-08-19T15:48:05","activity_count":7},
{"activity_type":"CPU","activity_time":"2020-08-19T17:04:41","activity_count":3},
{"activity_type":"Network","activity_time":"2020-08-19T17:04:45","activity_count":6},
{"activity_type":"Memory","activity_time":"2020-08-19T17:08:05","activity_count":2},
{"activity_type":"CPU","activity_time":"2020-08-19T18:44:41","activity_count":4},
{"activity_type":"Network","activity_time":"2020-08-20T07:14:15","activity_count":5},
{"activity_type":"IO","activity_time":"2020-08-20T07:18:15","activity_count":6},
{"activity_type":"CPU","activity_time":"2020-08-20T07:34:01","activity_count":1}
{"activity_type":"CPU","activity_time":"2020-08-15T05:34:45","activity_count":5},
{"activity_type":"Memory","activity_time":"2020-08-15T05:48:05","activity_count":2},
{"activity_type":"CPU","activity_time":"2020-08-15T05:54:41","activity_count":8}
hadoop fs -put input1.csv data/
Spark2-submit window_functions.py

Conclusion

  • Used Spark Structured Streaming API in our case study to build an application to consume streaming data from socket and write output to the console.
  • Learnt about Triggers and various Output Modes and implemented in application.
  • Next, we implemented a Transformation in the code and observed results.
  • Also, learnt about Event Time and Processing Time followed by the concepts of Windows.
  • Understood the difference between Sliding and Tumbling Windows.
  • Implemented a Window function with window duration of 1 day and sliding duration of 1 hour.

--

--

Data Engineering and Analytics | AWS Enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store