--- description: 'Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns.' applyTo: '**/*.scala, **/build.sbt, **/build.sc' --- # Scala + Apache Spark Best Practices Guidelines for writing efficient, maintainable, and production-ready Apache Spark applications in Scala. ## Dependencies ### SBT ```scala val sparkVersion = "3.5.1" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided" ) ``` ### Maven ```xml 3.5.1 2.13 org.apache.spark spark-core_${scala.binary.version} ${spark.version} provided org.apache.spark spark-sql_${scala.binary.version} ${spark.version} provided org.apache.spark spark-mllib_${scala.binary.version} ${spark.version} provided org.apache.spark spark-streaming_${scala.binary.version} ${spark.version} provided ``` Mark Spark dependencies as `"provided"` since the cluster supplies them at runtime. Only bundle application-specific libraries in the fat JAR. ## SparkSession Setup Always use `SparkSession` as the single entry point: ```scala import org.apache.spark.sql.SparkSession val spark: SparkSession = SparkSession.builder() .appName("MyApplication") .config("spark.sql.shuffle.partitions", "200") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ ``` - Do **not** create multiple `SparkSession` instances in the same JVM. - Avoid hardcoding `master` in application code; set it at submit time via `--master`. ## DataFrames vs Datasets vs RDDs Prefer the **DataFrame API** (untyped `Dataset[Row]`) for most workloads. Use **Datasets** (typed) when compile-time type safety justifies the serialization overhead. Avoid raw **RDDs** unless you need low-level control. ```scala import org.apache.spark.sql.{DataFrame, Dataset} // Preferred — DataFrame API val df: DataFrame = spark.read.parquet("data/events") val result = df .filter($"status" === "active") .groupBy($"region") .agg(count("*").as("total")) // Typed Dataset — use when schema safety matters case class Event(id: Long, status: String, region: String) val ds: Dataset[Event] = df.as[Event] val active = ds.filter(_.status == "active") ``` ## Schema Management Always define schemas explicitly when reading semi-structured data instead of relying on schema inference: ```scala import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("id", LongType, nullable = false), StructField("name", StringType, nullable = true), StructField("timestamp", TimestampType, nullable = false), StructField("amount", DecimalType(18, 2), nullable = true), StructField("tags", ArrayType(StringType), nullable = true) )) val df = spark.read .schema(schema) .json("data/events/*.json") ``` - Schema inference (`inferSchema=true`) reads the entire data source and is expensive for large files. - For Parquet and Delta, the schema is embedded — explicit definition is unnecessary. ## Column Expressions Prefer `col()` or `$""` over string column names in transformations for early error detection: ```scala import org.apache.spark.sql.functions._ // Good — type-checked column references df.select(col("name"), $"amount" * 1.1 as "adjusted_amount") // Avoid — string-only references delay errors to runtime df.select("name", "amount") ``` ## Joins ### Broadcast Joins Broadcast the smaller side of a join when it fits in executor memory (typically < 100 MB): ```scala import org.apache.spark.sql.functions.broadcast val enriched = largeDF.join( broadcast(smallLookupDF), Seq("key"), "left" ) ``` ### Avoiding Cartesian Products Never use cross joins unless intentional. Enable the safeguard: ```scala spark.conf.set("spark.sql.crossJoin.enabled", "false") ``` ### Skew Handling For joins on skewed keys, salt the key to distribute load: ```scala import org.apache.spark.sql.functions._ val saltBuckets = 10 val saltedLeft = leftDF.withColumn("salt", (rand() * saltBuckets).cast("int")) val saltedRight = rightDF .crossJoin((0 until saltBuckets).toDF("salt")) val result = saltedLeft .join(saltedRight, Seq("join_key", "salt")) .drop("salt") ``` The tradeoff is that the right side grows by 10×, so this only works when the right side is reasonably small or the skew is severe enough to justify it. For Spark 3.x+, AQE's built-in skew join handling (`spark.sql.adaptive.skewJoin.enabled = true`) can do this automatically without manual salting. ## Partitioning and Bucketing ### Write Partitioning Partition output by high-cardinality filter columns (e.g., date): ```scala df.write .partitionBy("year", "month") .mode("overwrite") .parquet("output/events") ``` - Avoid partitioning on high-cardinality columns (e.g., user ID) which creates millions of small files. ### Shuffle Partitions Tune `spark.sql.shuffle.partitions` based on data volume: ```scala // Default is 200; adjust based on data size // Rule of thumb: target 128 MB per partition spark.conf.set("spark.sql.shuffle.partitions", "400") ``` ### Repartition vs Coalesce ```scala // Repartition — full shuffle, use to increase or evenly distribute partitions df.repartition(100, $"key") // Coalesce — no shuffle, use only to reduce partition count df.coalesce(10) ``` Never use `coalesce(1)` on large datasets — it forces all data through a single task. ## Caching and Persistence Cache only when a DataFrame is reused multiple times: ```scala import org.apache.spark.storage.StorageLevel val cached = expensiveDF.persist(StorageLevel.MEMORY_AND_DISK) cached.count() // materialize the cache // Use cached DF multiple times val summary = cached.groupBy("region").count() val filtered = cached.filter($"amount" > 1000) // Always unpersist when done cached.unpersist() ``` - Prefer `MEMORY_AND_DISK` over `MEMORY_ONLY` to avoid recomputation on eviction. - Never cache DataFrames that are only used once. ## UDFs — Use Sparingly Prefer built-in Spark SQL functions over UDFs. UDFs disable Catalyst optimizations and require serialization: ```scala import org.apache.spark.sql.functions._ // Good — use built-in functions df.withColumn("upper_name", upper($"name")) .withColumn("name_length", length($"name")) // Avoid — UDF for something built-in functions handle val upperUdf = udf((s: String) => s.toUpperCase) df.withColumn("upper_name", upperUdf($"name")) ``` When a UDF is unavoidable, prefer `spark.udf.register` for SparkSQL compatibility, and handle nulls explicitly: ```scala val parseStatus = udf((raw: String) => { Option(raw).map(_.trim.toLowerCase) match { case Some("active") | Some("enabled") => "ACTIVE" case Some("inactive") | Some("disabled") => "INACTIVE" case _ => "UNKNOWN" } }) ``` ## Window Functions Use window functions for ranking, running totals, and lag/lead calculations: ```scala import org.apache.spark.sql.expressions.Window val windowSpec = Window .partitionBy("department") .orderBy($"salary".desc) val ranked = df .withColumn("rank", rank().over(windowSpec)) .withColumn("dense_rank", dense_rank().over(windowSpec)) .withColumn("row_number", row_number().over(windowSpec)) .withColumn("running_total", sum($"salary").over( Window.partitionBy("department").orderBy("hire_date") .rowsBetween(Window.unboundedPreceding, Window.currentRow) )) ``` ## Error Handling ### Corrupt Record Handling ```scala val df = spark.read .option("mode", "PERMISSIVE") // default: keeps corrupt rows .option("columnNameOfCorruptRecord", "_corrupt_record") .schema(schema) .json("data/events") val clean = df.filter($"_corrupt_record".isNull).drop("_corrupt_record") val bad = df.filter($"_corrupt_record".isNotNull) bad.write.json("data/quarantine") ``` ### Accumulator-Based Error Counting ```scala val parseErrors = spark.sparkContext.longAccumulator("parseErrors") val parsed = df.map { row => try { parseRow(row) } catch { case _: Exception => parseErrors.add(1) null } }.filter(_ != null) println(s"Parse errors: ${parseErrors.value}") ``` > **Caveat:** Accumulators are only guaranteed accurate inside actions (`count`, `collect`, `write`). If tasks are retried due to failures, accumulators can over-count. For exact error tracking, prefer the quarantine pattern above; use accumulators for operational monitoring only. ## Streaming (Structured Streaming) ```scala val stream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("subscribe", "events") .option("startingOffsets", "latest") .load() val parsed = stream .selectExpr("CAST(value AS STRING) as json") .select(from_json($"json", schema).as("data")) .select("data.*") val query = parsed.writeStream .format("delta") .option("checkpointLocation", "/checkpoints/events") .outputMode("append") .trigger(Trigger.ProcessingTime("30 seconds")) .start("output/events") query.awaitTermination() ``` - Always set a checkpoint location for fault tolerance. - Use `Trigger.ProcessingTime` or `Trigger.AvailableNow` — avoid `Trigger.Once` in production (use `AvailableNow` instead). ## Delta Lake Integration ```scala import io.delta.tables.DeltaTable // Upsert / merge val target = DeltaTable.forPath(spark, "data/customers") target.as("t") .merge(updatesDF.as("s"), "t.id = s.id") .whenMatched.updateAll() .whenNotMatched.insertAll() .execute() // Time travel val yesterday = spark.read .format("delta") .option("timestampAsOf", "2025-01-15") .load("data/customers") // Optimize and vacuum target.optimize().executeCompaction() target.vacuum(168) // retain 7 days ``` ## Performance Tuning Checklist 1. **Minimize shuffles** — use `broadcast` joins, pre-partition data, avoid unnecessary `groupBy`. 2. **Avoid `collect()` on large DataFrames** — it pulls all data to the driver. 3. **Prefer `explain(true)`** to inspect physical plans before running expensive jobs. 4. **Enable Adaptive Query Execution (AQE)**: ```scala spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") ``` 5. **Use columnar formats** (Parquet, Delta, ORC) over CSV/JSON for analytical workloads. 6. **Predicate pushdown** — filter early in the query plan; place filters before joins. 7. **Column pruning** — `select` only needed columns instead of `select("*")`. 8. **Avoid `distinct()` before `groupBy`** — the aggregation already deduplicates. ## Testing ### Unit Testing Transformations Test pure transformation functions without a SparkSession when possible: ```scala import org.scalatest.funsuite.AnyFunSuite class TransformationsTest extends AnyFunSuite { test("parseStatus maps known values correctly") { assert(parseStatus("active") == "ACTIVE") assert(parseStatus("DISABLED") == "INACTIVE") assert(parseStatus(null) == "UNKNOWN") } } ``` ### Integration Testing with SparkSession Use a shared `SparkSession` for DataFrame-level tests: ```scala import org.apache.spark.sql.SparkSession import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite trait SparkTestBase extends AnyFunSuite with BeforeAndAfterAll { lazy val spark: SparkSession = SparkSession.builder() .master("local[2]") .appName("test") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() override def afterAll(): Unit = { spark.stop() super.afterAll() } } class EventPipelineTest extends SparkTestBase { import spark.implicits._ test("pipeline filters inactive events") { val input = Seq( Event(1L, "active", "US"), Event(2L, "inactive", "EU") ).toDS() val result = filterActive(input) assert(result.count() == 1) assert(result.collect().head.status == "active") } } ``` ## Application Packaging ### Fat JAR with sbt-assembly ```scala // project/plugins.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5") // build.sbt assembly / assemblyMergeStrategy := { case PathList("META-INF", _*) => MergeStrategy.discard case _ => MergeStrategy.first } ``` ### Spark Submit ```bash spark-submit \ --class com.example.MainApp \ --master yarn \ --deploy-mode cluster \ --num-executors 10 \ --executor-memory 8g \ --executor-cores 4 \ --conf spark.sql.adaptive.enabled=true \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ target/scala-2.13/my-app-assembly-1.0.jar \ --input s3://bucket/input \ --output s3://bucket/output ``` ## Common Anti-Patterns | Anti-Pattern | Why It's Bad | Fix | |---|---|---| | `collect()` on large data | OOM on driver | Use `take(n)`, `show()`, or write to storage | | `count()` inside loops | Triggers full DAG evaluation each time | Cache and count once | | UDF for built-in operations | Disables Catalyst optimizer | Use `org.apache.spark.sql.functions._` | | `var` for DataFrames | Mutable references cause confusion | Chain transformations or use `val` | | Schema inference on CSV/JSON | Reads entire source, fragile | Define `StructType` explicitly | | `coalesce(1)` on large data | Single-task bottleneck | Use `repartition` with reasonable count | | Nested `map` on RDDs | Quadratic complexity | Use `join` or `broadcast` | | Ignoring data skew | Straggler tasks, OOM | Salt keys or use AQE skew handling | ## Dynamic Allocation Enable dynamic allocation to let Spark scale executors up and down based on workload demand. This is essential for shared clusters where fixed executor counts waste resources during idle stages: ```scala spark.conf.set("spark.dynamicAllocation.enabled", "true") spark.conf.set("spark.dynamicAllocation.minExecutors", "2") spark.conf.set("spark.dynamicAllocation.maxExecutors", "50") spark.conf.set("spark.dynamicAllocation.initialExecutors", "5") spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s") spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") ``` Or via `spark-submit`: ```bash spark-submit \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.minExecutors=2 \ --conf spark.dynamicAllocation.maxExecutors=50 \ --conf spark.shuffle.service.enabled=true \ ... ``` Key settings: | Setting | Purpose | |---|---| | `minExecutors` | Floor — always keep at least this many executors running | | `maxExecutors` | Ceiling — cap to prevent monopolizing the cluster | | `initialExecutors` | Starting count before auto-scaling kicks in | | `executorIdleTimeout` | Remove idle executors after this duration (default 60s) | | `schedulerBacklogTimeout` | Request new executors when tasks have been pending this long | - **Requires `spark.shuffle.service.enabled=true`** on YARN/Mesos — an external shuffle service preserves shuffle files after executors are removed. Without it, removed executors lose their shuffle data, forcing costly recomputation. - On **Kubernetes**, use `spark.dynamicAllocation.shuffleTracking.enabled=true` instead (no external shuffle service needed). - **Do not combine** `--num-executors` with dynamic allocation — they conflict. Remove `--num-executors` when enabling dynamic allocation.