E05 – Spark Under the Hood: Jobs, Stages & Tasks

πŸ”₯ E05 – Hands-On Exercises: Spark Under the Hood

Exercise 1: Count Lines Containing "Spark" (Python)

Objective: Demonstrate lazy evaluation and Spark actions.

>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
Expected Outcome: A count of lines containing "Spark" (e.g. 20).

Exercise 2: Count Lines Containing "Spark" (Scala)

Objective: Same logic in Scala, showing API consistency.

import org.apache.spark.sql.functions._

val strings = spark.read.text("../README.md")
val filtered = strings.filter(col("value").contains("Spark"))
filtered.count()
Expected Outcome: A Long result with the count of lines containing "Spark".

Exercise 3: Python Standalone App – M&M Count

Objective: Write a full Spark application that aggregates counts of M&M colors per state.

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: mnmcount ", file=sys.stderr)
        sys.exit(-1)

    spark = (SparkSession.builder
        .appName("PythonMnMCount")
        .getOrCreate())

    mnm_file = sys.argv[1]

    mnm_df = (spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(mnm_file))

    count_mnm_df = (mnm_df
        .select("State", "Color", "Count")
        .groupBy("State", "Color")
        .agg(count("Count").alias("Total"))
        .orderBy("Total", ascending=False))

    count_mnm_df.show(n=60, truncate=False)
    print("Total Rows = %d" % (count_mnm_df.count()))

    ca_count_mnm_df = (mnm_df
        .select("State", "Color", "Count")
        .where(mnm_df.State == "CA")
        .groupBy("State", "Color")
        .agg(count("Count").alias("Total"))
        .orderBy("Total", ascending=False))

    ca_count_mnm_df.show(n=10, truncate=False)
    spark.stop()
Expected Outcome:
A table of counts for all states/colors and then a filtered table for California.
Example:
|CA|Yellow|1807|
|CA|Green |1723|
|CA|Blue  |1603|
      

Exercise 4: Scala Standalone App – M&M Count

Objective: Equivalent Spark application written in Scala.

package main.scala.chapter2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object MnMcount {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("MnMCount")
      .getOrCreate()

    if (args.length < 1) {
      print("Usage: MnMcount ")
      sys.exit(1)
    }

    val mnmFile = args(0)

    val mnmDF = spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(mnmFile)

    val countMnMDF = mnmDF
      .select("State", "Color", "Count")
      .groupBy("State", "Color")
      .agg(count("Count").alias("Total"))
      .orderBy(desc("Total"))

    countMnMDF.show(60)
    println(s"Total Rows = ${countMnMDF.count()}")

    val caCountMnMDF = mnmDF
      .select("State", "Color", "Count")
      .where(col("State") === "CA")
      .groupBy("State", "Color")
      .agg(count("Count").alias("Total"))
      .orderBy(desc("Total"))

    caCountMnMDF.show(10)
    spark.stop()
  }
}
Expected Outcome: Same result tables as the Python version, but running as a Scala app.

Exercise 5: Scala Build File (sbt)

Objective: Build and package the Scala Spark app.

name := "main/scala/chapter2"
version := "1.0"
scalaVersion := "2.12.10"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.0.0-preview2",
  "org.apache.spark" %% "spark-sql" % "3.0.0-preview2"
)
Build with:
$ sbt clean package
Run with:
$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv

Comments

No comments yet. Be the first!

You must log in to comment.