π₯ E05 β Hands-On Exercises: Spark Under the Hood
Exercise 1: Count Lines Containing "Spark" (Python)
Objective: Demonstrate lazy evaluation and Spark actions.
>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
Expected Outcome: A count of lines containing "Spark" (e.g.
20).Exercise 2: Count Lines Containing "Spark" (Scala)
Objective: Same logic in Scala, showing API consistency.
import org.apache.spark.sql.functions._
val strings = spark.read.text("../README.md")
val filtered = strings.filter(col("value").contains("Spark"))
filtered.count()
Expected Outcome: A
Long result with the count of lines containing "Spark".Exercise 3: Python Standalone App β M&M Count
Objective: Write a full Spark application that aggregates counts of M&M colors per state.
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount ", file=sys.stderr)
sys.exit(-1)
spark = (SparkSession.builder
.appName("PythonMnMCount")
.getOrCreate())
mnm_file = sys.argv[1]
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy("Total", ascending=False))
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
ca_count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.where(mnm_df.State == "CA")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy("Total", ascending=False))
ca_count_mnm_df.show(n=10, truncate=False)
spark.stop()
Expected Outcome:
A table of counts for all states/colors and then a filtered table for California.
Example:
A table of counts for all states/colors and then a filtered table for California.
Example:
|CA|Yellow|1807|
|CA|Green |1723|
|CA|Blue |1603|
Exercise 4: Scala Standalone App β M&M Count
Objective: Equivalent Spark application written in Scala.
package main.scala.chapter2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object MnMcount {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("MnMCount")
.getOrCreate()
if (args.length < 1) {
print("Usage: MnMcount ")
sys.exit(1)
}
val mnmFile = args(0)
val mnmDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnmFile)
val countMnMDF = mnmDF
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy(desc("Total"))
countMnMDF.show(60)
println(s"Total Rows = ${countMnMDF.count()}")
val caCountMnMDF = mnmDF
.select("State", "Color", "Count")
.where(col("State") === "CA")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy(desc("Total"))
caCountMnMDF.show(10)
spark.stop()
}
}
Expected Outcome: Same result tables as the Python version, but running as a Scala app.
Exercise 5: Scala Build File (sbt)
Objective: Build and package the Scala Spark app.
name := "main/scala/chapter2"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.0.0-preview2",
"org.apache.spark" %% "spark-sql" % "3.0.0-preview2"
)
Build with:
Run with:
$ sbt clean packageRun with:
$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv

Comments
No comments yet. Be the first!
You must log in to comment.