Recently, I had to update a Spark application using Spark Streaming to Spark Structured Streaming. During this process, there were some parts I couldn’t find documentation or examples on. I decided to write about those parts, someone(probably future me) will find them useful.

Reading from Kafka

Spark has a good guide for integration with Kafka. However, some parts were not easy to grasp.

Deserializing records from Kafka was one of them. When using Spark Structured Streaming to read from Kafka, the developer has to handle deserialization of records. By default, records are deserialized as String or Array[Byte]. However, if your records are not in either of these formats, you have to perform deserialization in Dataframe operations.

Here is a code to stream data from Kafka:

val topic : String = ???
val bootStrapServers: String = ???
val sparkSession: SparkSession = ???
val dataFrame: DataFrame = sparkSession.readStream
  .format("kafka")
  .option("subscribe", topic)
  .option( "kafka.bootstrap.servers", kafkaConfig.bootstrapServers )
  .option("startingOffsets", "earliest")
  .options(extraParams)
  .load()

The schema of the DataFrame created from the streaming source is:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

As you can see from the schema, the key and value are Array[Byte]. In order to modify them to our desired type, we have to deserialize them.

I created a generic method that can be used for deserializing key K and values V from a DataFrame.

import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

def kafkaReader[K: TypeTag: Encoder, V: TypeTag: Encoder](
    topic: String,
    bootStrapServers: String,
    sparkSession: SparkSession,
    keyDeserializer: Deserializer[K],
    valueDeserializer: Deserializer[V]
  ): Dataset[(K, V)] = {
    import sparkSession.implicits._
    
    sparkSession.readStream
      .format("kafka")
      .option("subscribe", topic)
      .option("kafka.bootstrap.servers", bootstrapServers)
      .load()
      .selectExpr("key", "value") // Selecting only key & value
      .as[(Array[Byte], Array[Byte])]
      .flatMap {
        case (key, value) =>
          for {
            deserializedKey <- Try {
              keyDeserializer.deserialize(topic, key)
            }.toOption
            deserializedValue <- Try {
              valueDeserializer.deserialize(topic, value)
            }.toOption
          } yield (deserializedKey, deserializedValue)
      }
}

Note that the deserializer has to be serializable, else the spark executor will fail with a org.apache.spark.SparkException: Task not serializable error. An easy way to achieve this is to extend deserializer with Serializable. For e.g

val keyDeserializer: StringDeserializer = new StringDeserializer with Serializable

Writing data to Kafka

Writing data to Kafka in Spark Structured Streaming is quite similar to reading from Kafka. The developer has to serialize the data to either Array[Byte] or String before writing.

Here is a generic function to stream a Dataset of Tuple2[K,V] to Kafka:

import org.apache.kafka.common.serialization.Serializer
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.DataStreamWriter

def writeToKafka[K, V](
    topic: String,
    bootStrapServers: String,
    stream: Dataset[(K, V)],
    keySerializer: Serializer[K],
    valueSerializer: Serializer[V],
    kafkaConfig: KafkaConfig
  ): StreamingQuery = {
    import stream.sparkSession.implicits._

    stream.map { case ((key, value)) =>
      (keySerializer.serialize(topic, key), valueSerializer.serialize(topic, value))
    }
    .selectExpr("_1 as key", "_2 as value")
    .writeStream
    .format("kafka")
    .outputMode("append")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("topic", topic)
    .start()
}

Also, a few things to note:

– The serialisers also have to be Serialisable. An easy option is also extending Serializable.
– The Dataset/DataFrame to be inserted to Kafka needs to have key and value columns which will be mapped as key and value for Kafka ProducerRecord respectively.

Writing to Cassandra

Writing a dataset stream to Cassandra is quite easy. However, a few things to note:

– org.apache.spark.sql.cassandra format does not support writing streams but batch writing.
– Spark Cassandra connector quotes column names. Therefore, if you are writing a Dataset with camel-case column names, you will need to rename those columns to lower-case. Else you will get errors like java.util.NoSuchElementException: Columns not found in table.

Here is an example function writing a dataset stream to Cassandra:

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.streaming.DataStreamWriter

def cassandraSinkWriter[T](
    keyspace: String,
    table: String,
    stream: stream: Dataset[T]
  ): DataStreamWriter[T] =
    stream.writeStream
      .foreachBatch { (batchDF: Dataset[T], _: scala.Long) =>
        batchDF
          .toDF(batchDF.columns map (_.toLowerCase): _*) // transforming column names to lowercase
          .write
          .cassandraFormat(table, keyspace)
          .save()
      }

I hope you find these snippets useful. Please comment if you have questions, suggestions or need more clarification.

Ikenna Ogbajie

Software Developer