π₯ E06 β Hands-On Exercises: Scala Spark Shell, RDDs & WordCount
Exercise 1: Basic Scala in Spark Shell
Objective: Warm up with simple Scala expressions.
scala> 2 + 2
res0: Int = 4
scala> val sum = 2 + 2
sum: Int = 4
Expected Outcome: Scala evaluates expressions directly in the shell, showing results or storing them in variables.
Exercise 2: Creating and Parallelizing a Collection
Objective: Create a sequence and turn it into an RDD.
scala> val xs = (1 to 1000).toList
scala> val xsRdd = sc.parallelize(xs)
Expected Outcome: A list of numbers 1β1000, converted into an RDD of type
RDD[Int].Exercise 3: Filtering Even Numbers
Objective: Use a transformation (lazy) and then trigger it with an action.
scala> val evenRdd = xsRdd.filter{ _ % 2 == 0 }
scala> val count = evenRdd.count
scala> val first = evenRdd.first
scala> val first5 = evenRdd.take(5)
Expected Outcome:
count = 500first = 2first5 = Array(2, 4, 6, 8, 10)
Exercise 4: Loading Log Data
Objective: Load a text log file into an RDD and preprocess it.
scala> val rawLogs = sc.textFile("data/app.log")
scala> val logs = rawLogs.map { line => line.trim.toLowerCase() }
scala> logs.persist()
scala> val totalCount = logs.count()
Expected Outcome:
totalCount = 103 (for the sample dataset).Exercise 5: Filtering Error Logs
Objective: Extract only logs with severity level error.
scala> val errorLogs = logs.filter{_.split("\\s+")(1) == "error"}
scala> val errorCount = errorLogs.count()
scala> val firstError = errorLogs.first()
scala> val first3Errors = errorLogs.take(3)
Expected Outcome:
errorCount = 26, and first few log lines containing error.Exercise 6: Finding Maximums in Logs
Objective: Compute longest line and line with most words.
scala> val lengths = logs.map(_.size)
scala> val maxLen = lengths.reduce((a, b) => if (a > b) a else b)
scala> val wordCounts = logs.map(_.split("\\s+").size)
scala> val maxWords = wordCounts.reduce((a, b) => if (a > b) a else b)
Expected Outcome:
maxLen = 117maxWords = 20
Exercise 7: Writing a Function & Counting by Severity
Objective: Define a custom function and use it in transformations.
scala> :paste
def severity(log: String): String = {
val columns = log.split("\\s+", 3)
columns(1)
}
^D
scala> val pairs = logs.map(log => (severity(log), 1))
scala> val countBySeverityRdd = pairs.reduceByKey(_ + _)
scala> val countBySeverity = countBySeverityRdd.collect()
Expected Outcome:
Array((warn,25), (info,27), (error,26), (debug,25)).Exercise 8: Saving RDDs
Objective: Persist results to disk.
scala> errorLogs.saveAsTextFile("data/error_logs")
scala> countBySeverityRdd.saveAsTextFile("data/log-counts-text")
scala> countBySeverityRdd.saveAsSequenceFile("data/log-counts-seq")
Expected Outcome: Output directories with partitioned result files.
Exercise 9: WordCount Spark Application
Objective: Build the classic WordCount app in Scala.
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount {
def main(args: Array[String]): Unit = {
val inputPath = args(0)
val outputPath = args(1)
val sc = new SparkContext()
val lines = sc.textFile(inputPath)
val wordCounts = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile(outputPath)
}
}
Expected Outcome: Output directory with files containing word β count pairs.
Exercise 10: sbt Build and Spark Submit
Objective: Compile and run the WordCount app with sbt and spark-submit.
name := "word-count"
version := "1.0.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" % "provided"
$ cd WordCount
$ sbt package
$ ~/path/to/SPARK_HOME/bin/spark-submit \
--class "WordCount" --master local[*] \
target/scala-2.10/word-count_2.10-1.0.0.jar \
path/to/input-files path/to/output-directory
Expected Outcome: Compiled JAR packaged by sbt, then executed by
spark-submit with word counts written to output directory.
Comments
No comments yet. Be the first!
You must log in to comment.