Kafka-Integration-with-Apache-Spark

Objective

  • Set up Kafka
  • Create a Topic
  • Publish into the Kafka Topic
  • Set up a Spark Job to read from the Kafka Topic
  • Execute
  1. Read from Kafka
  2. Write to Kafka

Codes

Segment 1: Read from Kafka

mkdir kafka
cd kafka
wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-new-topic
vi read_from_kafka.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","ec2-54-237-150-57.compute-1.amazonaws.com:9092") \
.option("subscribe","kafka-new-topic") \
.load()

kafkaDF = lines.selectExpr("cast(key as string)","cast(value as string)")

query = kafkaDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-new-topic
wget https://ds-spark-sql-kafka-jar.s3.amazonaws.com/spark-sql-kafka-0-10_2.11-2.3.0.jar
spark2-submit –jars spark-sql-kafka-0-10_2.11-2.3.0.jar read_from_kafka.py

Segment 2: Write to Kafka

vi write_to_kafka.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType

spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

mySchema = StructType().add("name", "string").add("age","integer")

lines = spark \
.readStream \
.option("delimiter",";") \
.format("csv") \
.schema(mySchema) \
.load("testdir/")

kafkaDF = lines.selectExpr("name as key","cast(age as string) as value")

query = kafkaDF \ .writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers","ec2-3-236-151-70.compute-1.amazonaws.com:9092") \
.option("topic","kafka-new-topic") \ .option("checkpointLocation","checkpoint_dir") \
.start() query.awaitTermination()
cat players.csv
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-new-topic
spark2-submit –jars spark-sql-kafka-0-10_2.11-2.3.0.jar write_to_kafka.py
hadoop fs -put players.csv testdir/players.csv

Conclusion

  • Downloaded Kafka binaries and setup it on AWS EC2 instance.
  • Used Kafka for 2 operations — Read from Kafka and Write to Kafka.
  • Created a topic called new-kafka-topic and published the messages there
  • Integrated Kafka to read from a consumer and applying various transformations.
  • Passed data to HDFS as reading source and written the output into Kafka.

--

--

Data Engineering and Analytics | AWS Enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store