Configs to optimize Spark Structured Streaming

SekFook
3 min readAug 9, 2022

Introduction

Improving the performance of Spark Structured streaming is always an important topic, especially, if you have a large stream of data incoming every few seconds, the large data might be difficult for the process to handle, especially if you are doing the stateful computation. Besides, a long processing time for each micro batch might not be ideal for your use case, in this article, I would share some spark config then might help you to optimise the performance.

State Store

A State store is basically a key-value store that helps us to manage the stateful operation in each micro batch, we store the current state in the state store for future use. There are 2 built-in state store providers which are HDFS and RockDB.

We recommend the use of RockDB instead of HDFS as It saves us from the JVM memory pressure. RockDB-based store works efficiently even with 100 million states and is also not impacted by the GC process. Besides, It comes with other benefits such as Memtable (for storing recent writes in memory), write-ahead log (failure tolerance), transaction support, checkpoint and SSD-optimized.

spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

ref: https://docs.databricks.com/structured-streaming/rocksdb-state-store.html

watermark policy

Late-arriving data is a common problem in streaming, handling unbound intermediate in-memory state could be a load for your machine and impact latency, so it is important to know when we can drop the in-memory state as we are not going to accept the late data for processing anymore.

watermarking is the parameter that tracks the current event time in the data and attempts to clean up the old state accordingly.

t = watermaking thresholdreject the late data if :
(max event time seen by the engine - late threshold) > T

While running the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark safely moves at the pace of the slowest stream and the query output is delayed accordingly.

Setting up multiple watermark can also help you to get a faster result.

df.withWatermark('event_time', '5 minutes')

ref: https://docs.databricks.com/structured-streaming/multiple-watermarks.html

Asynchronous checkpoint

Asychronous checkpoint can reduce stateful streaming queries bottlenecked on state updates without sacrificing fault tolerance, but with a minor cost of higher restart delays.

Structured Streaming uses synchronous checkpointing by default. Every micro-batch ensures that all the state updates in that batch are backed up in cloud storage (called “checkpoint location”) before starting the next batch. If a stateful streaming query fails, all micro-batches except the last micro-batch are checkpointed. On restart, only the last batch needs to be re-run. Fast recovery with synchronous checkpointing comes at the cost of higher latency for each micro-batch.

In case of Asynchronous state checkpointing, when there is a restart, the batch that failed and the previous mirco-batch needed to be restarted (two batches as compared to one in synchronous checkpointing)

How do know you need to enable this setting:

  • if the state operator commits time / micro-batch processing time > a certain threshold means that you are spending a lot of time on committing and you should enable this.

However, any failure in an asynchronous checkpoint at any one or more stores fails the query while in synchronous checkpointing mode, the checkpoint is executed as part of the task and Spark retries the task multiple times before failing the query.

Asynchronous state checkpointing is supported only in the RocksDB state store provider.

spark.conf.set("spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled", "true")

ref: https://docs.databricks.com/structured-streaming/async-checkpointing.html

--

--

SekFook

Software engineer at Xendit. Love data, machine learning, distributed systems and writing clean code. I got too many articles in my reading list.