Data Engineering Transit Data with Scala + Spark
- Published on
- Authors
- Name
- Julien Marc Brown
- @GanzeKarte
Part 2 of Series: Understanding and Predicting Transit Metrics
In the continuation of our series on building a data engineering pipeline using Scala and Spark, this article dives into the essential steps of pre-processing. While our previous post laid out the overarching goals of this series, in this segment, we shift our focus to preparing our input data for seamless integration with other processes and making it available to end-users.
Why Not Pandas?
The obvious question is why not use pandas or something like that? Yes you absolutely can, and we will explore this in another series.
Python's Pandas library is a fantastic tool for data transformation and analysis. One could argue that for a simple preprocessing task, Pandas will suffice. Especially considering this should be able to fit into memory on a single machine. However, as we look at the broader picture – especially the re-usability between this specific data file outputs and others in our pipeline – Spark emerges as the ideal choice for this scenario.
Introducing Spark
Spark, a highly esteemed framework in the data engineering space, offers robust capabilities for distributed data processing. Its integration with Scala allows for efficient parallel processing, which is crucial when dealing with vast datasets or complex transformations. Moreover, Spark's versatility extends from batch processing to streaming and machine learning, making it a comprehensive tool for our series' objectives.
Why Choose Spark?
- Scalability: With its distributed data processing capabilities, Spark can efficiently handle massive datasets.
- Integration with Scala: When combined with Scala, Spark offers a type-safe way to manipulate data and ensure code quality.
- Versatility: From analytics to machine learning, Spark provides a plethora of libraries and tools to cater to a wide range of requirements.
Goals/Expected Outputs
- Deep Dive into Data: Gain insights into the data structure, types, and distribution.
- Pipeline Extensibility: We will later cover how to bring other datasets into the picture, including GIS
- XLS Processing: Refine and transform the XLS file for subsequent stages.
- ML Readiness: Produce outputs compatible with machine learning algorithms.
- Database Compatibility: Ensure the processed data aligns with database insertion requirements.
Input
- Data Source: NTD XLS file detailing ridership metrics.
Larger Dependencies
- Spark: Version 3.X, our primary processing engine.
- Scala: Version 2.12.X, the programming language driving our pipeline.
Smaller Dependencies
- Libraries: Various smaller libraries (details to be expanded upon) that aid in specific functions within the pipeline.
Step One: Setting the Stage
- Objective: Initiate a runnable program that invokes a Spark session.
To kick things off, we'll craft a rudimentary runnable program. This foundation will serve as the scaffold upon which we'll build our data processing functionalities in subsequent steps.
Start by creating an SBT project. Creating this
object Pipeline {
def main(args: Array[String]): Unit = {
???
}
}
Add imports for Spark to your build.sbt
libraryDependencies ++= Seq(
"io.spray" %% "spray-json" % "1.3.6",
"org.scalatest" %% "scalatest" % "3.2.9" % "test",
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.crealytics" %% "spark-excel" % "3.4.1_0.19.0",
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.postgresql" % "postgresql" % "42.6.0"
)
Let's enhance our file by incorporating a new method named run()
.
In this method, you'll initialize a fresh Spark configuration and session. If you're operating non-locally with provided
, the session fetches an existing one, or if not found, it will create a new one.
def run(): Unit = {
// Define a Spark configuration
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.executor.memory", "4g")
.set("spark.driver.memory", "4g")
// Initialize a Spark session
implicit val spark: SparkSession =
SparkSession.builder
.config(conf = conf)
.appName("entrypoint")
.getOrCreate()
}
By integrating this run()
method, you're setting the foundation to leverage Spark's capabilities in our pipeline effectively.
Now we have a spark program that has one file that can create a spark program and is runnable
Pipeline.scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Pipeline {
def main(args: Array[String]): Unit = {
???
}
def run(): Unit = {
// Create a Spark configuration
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.executor.memory", "4g")
.set("spark.driver.memory", "4g")
// Create a Spark session
implicit val spark: SparkSession =
SparkSession.builder
.config(conf = conf)
.appName("entrypoint")
.getOrCreate()
}
}
Step Two: Data Loading Having familiarized ourselves with the data formats earlier in the tutorial, our next step involves data ingestion.
We'll use the com.crealytics.spark.excel library, a Spark wrapper for Apache POI. This makes loading an Excel file into a dataframe or RDD straightforward.
We're introducing a new object, RidershipMasterTabDF, with a method named xlsFromPath. This method reads the XLS file and returns its content as a dataframe.
object RidershipMasterTabDF {
def xlsFromPath(path: String)(implicit spark: SparkSession): DataFrame = {
val df = spark.read.excel().load(path)
df
}
}
Now, integrate this newly created object into the Pipeline.scala. We also enhance the run method by adding an argument, enabling us to specify the path to the XLS file. Once the dataframe loads, the program will display the first five rows.
import com.ganzekarte.examples.steps.step_2.LoadXLS
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Pipeline {
def main(args: Array[String]): Unit = {
run(args(0))
}
def run(path: String): Unit = {
// Create a Spark configuration
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.executor.memory", "4g")
.set("spark.driver.memory", "4g")
// Create a Spark session
implicit val spark: SparkSession =
SparkSession.builder
.config(conf = conf)
.appName("entrypoint")
.getOrCreate()
val df = RidershipMasterTabDF.xlsFromPath(path = path)
df.show(5)
}
}
Running the program at this point will display something like
+--------------------+----+----+----+
| Read Me| _c1| _c2| _c3|
+--------------------+----+----+----+
| null|null|null|null|
|Users are encoura...|null|null|null|
| null|null|null|null|
|1. General...|null|null|null|
|This Excel workbo...|null|null|null|
+--------------------+----+----+----+
only showing top 5 rows
When you run this program, you might notice that it loads the "Read Me" tab from the XLS file. This tab provides valuable insights for manual review but becomes an obstacle for automated data processing.
In the first installment of our series, we explored various data types spread across the Excel tabs. Our analysis identified the "Master" tab as a valuable source. This tab not only offers a consistent set of columns but also provides summary metrics. In subsequent steps, we'll focus on extracting and processing data from this specific tab.
Step Three: Loading Data From the Master Tab
Objective: Extract data exclusively from the "Master" tab of the Excel file.
Given that the "Master" tab has been recognized as the primary area of interest, we can leverage the dataAddress
parameter from the spark.read.excel
function to pinpoint our desired data. By providing the name of our target tab, "Master", as the value for this parameter, Spark will be directed to fetch data specifically from this sheet.
Here's how you can adjust the RidershipMasterTabDF.scala
:
package com.ganzekarte.examples.steps.step_3
import org.apache.spark.sql.{DataFrame, SparkSession}
object RidershipMasterTabDF {
def xlsFromPath(path: String)(implicit spark: SparkSession): DataFrame = {
val df = spark.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("dataAddress", "Master!") // Directs Spark to the "Master" tab.
.load(path)
df
}
}
Executing this adjusted code and subsequently calling df.show()
will yield results that display the initial rows of the "Master" tab. The rows will encapsulate the structured and relevant data for further analysis and operations, thus enabling you to steer clear of extraneous information and focus solely on what's crucial for your use case.
|NTD ID|Legacy NTD ID| Agency|Mode|TOS|3 Mode|Status| Reporter Type| Organization Type|HQ City|HQ State|UZA|UACE CD| UZA Name|UZA SQ Miles|UZA Population|Service Area Population|Service Area SQ Miles|Most Recent Report Year|FY End Month|FY End Year|Passenger Miles FY|Unlinked Passenger Trips FY|Avg Trip Length FY| Fares FY|Operating Expenses FY|Avg Cost Per Trip FY|Avg Fares Per Trip FY|
+------+-------------+--------------------+----+---+------+------+--------------------+--------------------+-------+--------+---+-------+-------------------+------------+--------------+-----------------------+---------------------+-----------------------+------------+-----------+------------------+---------------------------+------------------+-------------+---------------------+--------------------+---------------------+
| 00001| 0001|King County Depar...| DR| PT| Bus|Active|Full Reporter: Op...|City, County or L...|SEATTLE| WA| 14| 80389|Seattle--Tacoma, WA| 983| 3,544,011| 2,317,700| 2,134| 2022| 12| 2022| 5,546,871| 555,210| 10| $740,726.00| $54,748,982.00| $98.61| $1.33|
| 00001| 0001|King County Depar...| DR| TN| Bus|Active|Full Reporter: Op...|City, County or L...|SEATTLE| WA| 14| 80389|Seattle--Tacoma, WA| 983| 3,544,011| 2,317,700| 2,134| 2022| 12| 2022| 0| 0| 0| $0.00| $0.00| $0.00| $0.00|
| 00001| 0001|King County Depar...| DR| TX| Bus|Active|Full Reporter: Op...|City, County or L...|SEATTLE| WA| 14| 80389|Seattle--Tacoma, WA| 983| 3,544,011| 2,317,700| 2,134| 2022| 12| 2022| 1,612,286| 110,794| 15| $91,601.00| $5,791,694.00| $52.27| $0.83|
| 00001| 0001|King County Depar...| FB| DO| Ferry|Active|Full Reporter: Op...|City, County or L...|SEATTLE| WA| 14| 80389|Seattle--Tacoma, WA| 983| 3,544,011| 2,317,700| 2,134| 2022| 12| 2022| 1,361,870| 400,407| 3|$1,715,265.00| $8,899,659.00| $22.23| $4.28|
| 00001| 0001|King County Depar...| LR| DO| Rail|Active|Full Reporter: Op...|City, County or L...|SEATTLE| WA| 14| 80389|Seattle--Tacoma, WA| 983| 3,544,011| 2,317,700| 2,134| 2022| 12| 2022| 0| 0| 0| $0.00| $0.00| $0.00| $0.00|
+------+-------------+--------------------+----+---+------+------+--------------------+--------------------+-------+--------+---+-------+-------------------+------------+--------------+-----------------------+---------------------+-----------------------+------------+-----------+------------------+---------------------------+------------------+-------------+---------------------+--------------------+---------------------+
only showing top 5 rows
and the schema
root
|-- NTD ID: string (nullable = true)
|-- Legacy NTD ID: string (nullable = true)
|-- Agency: string (nullable = true)
|-- Mode: string (nullable = true)
|-- TOS: string (nullable = true)
|-- 3 Mode: string (nullable = true)
|-- Status: string (nullable = true)
|-- Reporter Type: string (nullable = true)
|-- Organization Type: string (nullable = true)
|-- HQ City: string (nullable = true)
|-- HQ State: string (nullable = true)
|-- UZA: string (nullable = true)
|-- UACE CD: string (nullable = true)
|-- UZA Name: string (nullable = true)
|-- UZA SQ Miles: string (nullable = true)
|-- UZA Population: string (nullable = true)
|-- Service Area Population: string (nullable = true)
|-- Service Area SQ Miles: string (nullable = true)
|-- Most Recent Report Year: string (nullable = true)
|-- FY End Month: string (nullable = true)
|-- FY End Year: string (nullable = true)
|-- Passenger Miles FY: string (nullable = true)
|-- Unlinked Passenger Trips FY: string (nullable = true)
|-- Avg Trip Length FY: string (nullable = true)
|-- Fares FY: string (nullable = true)
|-- Operating Expenses FY: string (nullable = true)
|-- Avg Cost Per Trip FY: string (nullable = true)
|-- Avg Fares Per Trip FY: string (nullable = true)
We can see that the schema is entirely string values, even for numeric column’s.
Let’s add
inferSchema=true and see if this fixes it
val df = spark.read.excel(
header = true,
inferSchema = true,
dataAddress = "Master!",
).load(path)
Yes, it does for the most part, but we’re also mis-attributing some types such as double for year fields and id’s
root
|-- NTD ID: string (nullable = true)
|-- Legacy NTD ID: string (nullable = true)
|-- Agency: string (nullable = true)
|-- Mode: string (nullable = true)
|-- TOS: string (nullable = true)
|-- 3 Mode: string (nullable = true)
|-- Status: string (nullable = true)
|-- Reporter Type: string (nullable = true)
|-- Organization Type: string (nullable = true)
|-- HQ City: string (nullable = true)
|-- HQ State: string (nullable = true)
|-- UZA: double (nullable = true)
|-- UACE CD: string (nullable = true)
|-- UZA Name: string (nullable = true)
|-- UZA SQ Miles: double (nullable = true)
|-- UZA Population: double (nullable = true)
|-- Service Area Population: double (nullable = true)
|-- Service Area SQ Miles: double (nullable = true)
|-- Most Recent Report Year: double (nullable = true)
|-- FY End Month: double (nullable = true)
|-- FY End Year: double (nullable = true)
|-- Passenger Miles FY: double (nullable = true)
|-- Unlinked Passenger Trips FY: double (nullable = true)
|-- Avg Trip Length FY: double (nullable = true)
|-- Fares FY: double (nullable = true)
|-- Operating Expenses FY: double (nullable = true)
|-- Avg Cost Per Trip FY: double (nullable = true)
|-- Avg Fares Per Trip FY: double (nullable = true)
We will also need to track schemas here and in other files, so let’s externalize our schema model defining for each field. We will also use this for other files in this tutorial, so it may seem like overkill now but will be useful as we continue!
Link to step_three.json
Next we’re going to load this using scala spray, but you can use jackson/circe or some other library if you’d like.
package com.ganzekarte.examples.steps.step_3
import spray.json._
object FieldTransformationDefinitions extends DefaultJsonProtocol {
implicit val fieldTransformationDefinitionFormat: RootJsonFormat[FieldTransformationDefinition] = jsonFormat4(FieldTransformationDefinition)
val FieldDefinitions: List[FieldTransformationDefinition] = {
val source = scala.io.Source.fromFile("your_path")
val content = source.mkString
val parsed = content.parseJson.convertTo[List[FieldTransformationDefinition]]
parsed
}
case class FieldTransformationDefinition(
excelTabName: String,
sanitizedName: String,
dataType: String,
isNullable: Boolean
)
}
Next let’s define a function schemaFromDF
to use the loaded df column headers to load and create a DF struct type matching our explicit definitions
def schemaFromDF(df: DataFrame): StructType = {
val fields = df.columns.map { header =>
FieldDefinitions.find(_.excelTabName == header) match {
case Some(field) =>
val dataType = field.dataType match {
case "FloatType" => FloatType
case "IntegerType" | "LongType" => LongType
case _ => StringType
}
StructField(header, dataType, nullable = true)
case None =>
StructField(header, StringType, nullable = true)
}
}
StructType(fields)
}
after printing the loaded type
StructType(StructField(NTD ID,StringType,true),StructField(Legacy NTD ID,StringType,true),StructField(Agency,StringType,true),StructField(Mode,StringType,true),StructField(TOS,StringType,true),StructField(3 Mode,StringType,true),StructField(Status,StringType,true),StructField(Reporter Type,StringType,true),StructField(Organization Type,StringType,true),StructField(HQ City,StringType,true),StructField(HQ State,StringType,true),StructField(UZA,LongType,true),StructField(UACE CD,StringType,true),StructField(UZA Name,StringType,true),StructField(UZA SQ Miles,FloatType,true),StructField(UZA Population,FloatType,true),StructField(Service Area Population,FloatType,true),StructField(Service Area SQ Miles,FloatType,true),StructField(Most Recent Report Year,LongType,true),StructField(FY End Month,LongType,true),StructField(FY End Year,LongType,true),StructField(Passenger Miles FY,FloatType,true),StructField(Unlinked Passenger Trips FY,FloatType,true),StructField(Avg Trip Length FY,FloatType,true),StructField(Fares FY,FloatType,true),StructField(Operating Expenses FY,FloatType,true),StructField(Avg Cost Per Trip FY,FloatType,true),StructField(Avg Fares Per Trip FY,FloatType,true))
We’re going to update RidershipMasterTabDF
to create a DF with the new type, we will rename the first DF as peekDF to get the schema to pass into the struct generation function, before returning our explicitly typed function
RidershipMasterTabDF
.scala
package com.ganzekarte.examples.steps.step_3
import com.crealytics.spark.excel.ExcelDataFrameReader
import com.ganzekarte.examples.steps.step_3.FieldTransformationDefinitions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
object RidershipMasterTabDF {
def xlsFromPath(path: String)(implicit spark: SparkSession): DataFrame = {
val peekDF = spark.read.excel(
header = true,
inferSchema = true,
dataAddress = "Master!",
).load(path)
val schema = schemaFromDF(peekDF)
val df = spark.read.excel(
header = true,
inferSchema = true,
dataAddress = "Master!",
).schema(schema).load(path)
df
}
def schemaFromDF(df: DataFrame): StructType = {
val fields = df.columns.map { header =>
FieldDefinitions.find(_.excelTabName == header) match {
case Some(field) =>
val dataType = field.dataType match {
case "FloatType" => FloatType
case "IntegerType" | "LongType" => LongType
case _ => StringType
}
StructField(header, dataType, nullable = true)
case None =>
StructField(header, StringType, nullable = true)
}
}
StructType(fields)
}
}
And a peek into the schema
root
|-- NTD ID: string (nullable = true)
|-- Legacy NTD ID: string (nullable = true)
|-- Agency: string (nullable = true)
|-- Mode: string (nullable = true)
|-- TOS: string (nullable = true)
|-- 3 Mode: string (nullable = true)
|-- Status: string (nullable = true)
|-- Reporter Type: string (nullable = true)
|-- Organization Type: string (nullable = true)
|-- HQ City: string (nullable = true)
|-- HQ State: string (nullable = true)
|-- UZA: long (nullable = true)
|-- UACE CD: string (nullable = true)
|-- UZA Name: string (nullable = true)
|-- UZA SQ Miles: float (nullable = true)
|-- UZA Population: float (nullable = true)
|-- Service Area Population: float (nullable = true)
|-- Service Area SQ Miles: float (nullable = true)
|-- Most Recent Report Year: long (nullable = true)
|-- FY End Month: long (nullable = true)
|-- FY End Year: long (nullable = true)
|-- Passenger Miles FY: float (nullable = true)
|-- Unlinked Passenger Trips FY: float (nullable = true)
|-- Avg Trip Length FY: float (nullable = true)
|-- Fares FY: float (nullable = true)
|-- Operating Expenses FY: float (nullable = true)
|-- Avg Cost Per Trip FY: float (nullable = true)
|-- Avg Fares Per Trip FY: float (nullable = true)
As you can see we have the fields as defined in our schema configuration file, but they’
Step Four: Time Series Metrics Tab Loading
Building on the knowledge we have from the master tab, we can create TimeSeriesTabDF
We will also modify Pipeline.run
to rename the ridership master dataframe and show the new dataframe
package com.ganzekarte.examples.steps.step_4
import com.crealytics.spark.excel.ExcelDataFrameReader
import com.ganzekarte.examples.steps.step_4.FieldTransformationDefinitions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
object TimeSeriesDataTabDF {
/**
* Processes time series data from given Excel tabs and merges them into a single dataframe.
*
* @param path The path to the Excel file.
* @param targetTabs The list of tab names to process.
* @param spark Implicit Spark session instance.
* @return A dataframe with processed time series data.
*/
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
dfForTab.show(1)
dfForTab.printSchema()
dfForTab
}
}
def xlsFromPath(path: String, tabName: String)(implicit spark: SparkSession): DataFrame = {
val peekDF = spark.read
.option("locale", "en-US")
.excel(
header = true,
dataAddress = tabName + "!",
inferSchema = false,
usePlainNumberFormat = false,
maxRowsInMemory = 100000000,
maxByteArraySize = 100000000,
).load(path)
val schema = schemaFromDF(peekDF)
val df = spark.read
.option("locale", "en-US")
.excel(
header = true,
dataAddress = tabName + "!",
inferSchema = false,
usePlainNumberFormat = false,
maxRowsInMemory = 100000000,
maxByteArraySize = 100000000,
).schema(schema).load(path)
df
}
def schemaFromDF(df: DataFrame): StructType = {
val fields = df.columns.map { header =>
FieldDefinitions.find(_.excelTabName == header) match {
case Some(field) =>
val dataType = field.dataType match {
case "FloatType" => FloatType
case "IntegerType" | "LongType" => LongType
case _ => StringType
}
StructField(header, dataType, nullable = true)
case None =>
StructField(header, StringType, nullable = true)
}
}
StructType(fields)
}
}
package com.ganzekarte.examples.steps.step_4
import com.ganzekarte.examples.steps.step_4.RidershipMasterTabDF
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Pipeline {
def main(args: Array[String]): Unit = {
run(args(0), args.tail)
}
def run(path: String, inputTabs: Seq[String]): Unit = {
// Create a Spark configuration
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.executor.memory", "4g")
.set("spark.driver.memory", "4g")
// Create a Spark session
implicit val spark: SparkSession =
SparkSession.builder
.config(conf = conf)
.appName("entrypoint")
.getOrCreate()
// Define the target tabs for processing
val targetTabs = if (inputTabs.nonEmpty) inputTabs else Seq("UPT", "VRM", "VRH", "VOMS")
val ridershipMasterDF = RidershipMasterTabDF.xlsFromPath(path = path)
val timeSeriesTabDF = TimeSeriesDataTabDF.buildDF(path = path, targetTabs = targetTabs)
}
}
a peek into the DF data
|NTD ID|Legacy NTD ID| Agency|Status| Reporter Type|UZA|UACE CD| UZA Name|Mode|TOS|3 Mode|1/2002|2/2002|3/2002|4/2002|5/2002|6/2002|7/2002|8/2002|9/2002|10/2002|11/2002|12/2002|1/2003|2/2003|3/2003|4/2003|5/2003|6/2003|7/2003|8/2003|9/2003|10/2003|11/2003|12/2003|1/2004|2/2004|3/2004|4/2004|5/2004|6/2004|7/2004|8/2004|9/2004|10/2004|11/2004|12/2004|1/2005|2/2005|3/2005|4/2005|5/2005|6/2005|7/2005|8/2005|9/2005|10/2005|11/2005|12/2005|1/2006|2/2006|3/2006|4/2006|5/2006|6/2006|7/2006|8/2006|9/2006|10/2006|11/2006|12/2006|1/2007|2/2007|3/2007|4/2007|5/2007|6/2007|7/2007|8/2007|9/2007|10/2007|11/2007|12/2007|1/2008|2/2008|3/2008|4/2008|5/2008|6/2008|7/2008|8/2008|9/2008|10/2008|11/2008|12/2008|1/2009|2/2009|3/2009|4/2009|5/2009|6/2009|7/2009|8/2009|9/2009|10/2009|11/2009|12/2009|1/2010|2/2010|3/2010|4/2010|5/2010|6/2010|7/2010|8/2010|9/2010|10/2010|11/2010|12/2010|1/2011|2/2011|3/2011|4/2011|5/2011|6/2011|7/2011|8/2011|9/2011|10/2011|11/2011|12/2011|1/2012|2/2012|3/2012|4/2012|5/2012|6/2012|7/2012|8/2012|9/2012|10/2012|11/2012|12/2012|1/2013|2/2013|3/2013|4/2013|5/2013|6/2013|7/2013|8/2013|9/2013|10/2013|11/2013|12/2013|1/2014|2/2014|3/2014|4/2014|5/2014|6/2014|7/2014|8/2014|9/2014|10/2014|11/2014|12/2014|1/2015|2/2015|3/2015|4/2015|5/2015|6/2015|7/2015|8/2015|9/2015|10/2015|11/2015|12/2015|1/2016|2/2016|3/2016|4/2016|5/2016|6/2016|7/2016|8/2016|9/2016|10/2016|11/2016|12/2016|1/2017|2/2017|3/2017|4/2017|5/2017|6/2017|7/2017|8/2017|9/2017|10/2017|11/2017|12/2017|1/2018|2/2018|3/2018|4/2018|5/2018|6/2018|7/2018|8/2018|9/2018|10/2018|11/2018|12/2018|1/2019|2/2019|3/2019|4/2019|5/2019|6/2019|7/2019|8/2019|9/2019|10/2019|11/2019|12/2019|1/2020|2/2020|3/2020|4/2020|5/2020|6/2020|7/2020|8/2020|9/2020|10/2020|11/2020|12/2020|1/2021|2/2021|3/2021|4/2021|5/2021|6/2021|7/2021|8/2021|9/2021|10/2021|11/2021|12/2021|1/2022|2/2022|3/2022|4/2022|5/2022|6/2022|7/2022|8/2022|9/2022|10/2022|11/2022|12/2022|1/2023|2/2023|3/2023|4/2023|5/2023|6/2023|
+------+-------------+--------------------+------+--------------------+---+-------+-------------------+----+---+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+
| 00001| 0001|King County Depar...|Active|Full Reporter: Op...| 14| 80389|Seattle--Tacoma, WA| DR| PT| Bus| 574| 574| 574| 574| 574| 574| 574| 237| 574| 574| 574| 574| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 215| 217| 219| 212| 227| 230| 227| 225| 220| 220| 218| 225| 199| 211| 208| 211| 212| 211| 208| 202| 205| 217| 209| 216| 217| 217| 217| 217| 217| 217|
+------+-------------+--------------------+------+--------------------+---+-------+-------------------+----+---+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+
only showing top 1 row
and another peek into the schema
root
|-- NTD ID: string (nullable = true)
|-- Legacy NTD ID: string (nullable = true)
|-- Agency: string (nullable = true)
|-- Status: string (nullable = true)
|-- Reporter Type: string (nullable = true)
|-- UZA: long (nullable = true)
|-- UACE CD: string (nullable = true)
|-- UZA Name: string (nullable = true)
|-- Mode: string (nullable = true)
|-- TOS: string (nullable = true)
|-- 3 Mode: string (nullable = true)
|-- 1/2002: string (nullable = true)
....
|-- 6/2023: string (nullable = true)
I will spare the show for all, but we have a lot of columns and rows for each tab, with common columns between them.
As one can see there are a lot of string columns representing time series data and rows for each date and permutation of the various columns representing mode of travel for each agency. We will need to transform the date columns into
When inserting into a database or for future analysis. this information could better be presented as row based data for each agency, mode of transport with a specific date. It’s more intuitive and manageable to represent the data this way, and it aligns better with the nature of most algorithms and tools. We will need to transpose this data such
Next, We will employ a common technique to pivot this data.
Step Five: Transforming Time Series Metrics Tab
Building upon the current TimeSeriesDataTabDF
file, let's enhance it by introducing a companion class.
Inside the object, add the following type alias:
type DF = DataFrame
This approach mirrors a pattern I frequently use when crafting dataframes in Scala with Spark. It's akin to the .withX
pattern you might recognize from Spark.
Continue by incorporating this modification into the TimeSeriesDataTabDF
file.
class TimeSeriesDataTabDF(df: TimeSeriesDataTabDF.DF) {
/**
* Accessor method for the internal dataframe.
*
* @return Internal dataframe.
*/
def dataframe(): TimeSeriesDataTabDF.Dataframe = {
df
}
}
and in RidershipMasterTabDF
class RidershipMasterTabDF(df: RidershipMasterTabDF.DF) {
/**
* Accessor method for the internal dataframe.
*
* @return Internal dataframe.
*/
def dataframe(): RidershipMasterTabDF.DF = {
df
}
}
In both: We know there are some common data fields, that on a per row basis have stats that we’re interested in. We will be creating a checksum for rows that are matching to be joined later.
Let’s add a method withChecksumColumn() to both companion classes
We will read the columns that we define as being able to be checksummed
/**
* Adds a checksum column to the dataframe for later cross-referencing.
*
* @return Instance of TimeSeriesDataTabDF with the added checksum column.
*/
def withChecksumColumn: TimeSeriesDataTabDF = {
val filteredColumns = df.columns.filter(ChecksumFields.map(_.excelTabName).contains(_)).sorted
val concatenatedColumns = concat_ws("", filteredColumns.map(col): _*)
val checksum = md5(concatenatedColumns)
new TimeSeriesDataTabDF(df.withColumn("checksum", checksum))
}
and let’s use this in buildDF
In TimeSeriesDataTab.DF
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
val builder = new TimeSeriesDataTabDF(dfForTab)
.withChecksumColumn
builder.dataframe().show(1)
}
}
|NTD ID|Legacy NTD ID| Agency|Status| Reporter Type|UZA|UACE CD| UZA Name|Mode|TOS|3 Mode|1/2002|2/2002|3/2002|4/2002|5/2002|6/2002|7/2002|8/2002|9/2002|10/2002|11/2002|12/2002|1/2003|2/2003|3/2003|4/2003|5/2003|6/2003|7/2003|8/2003|9/2003|10/2003|11/2003|12/2003|1/2004|2/2004|3/2004|4/2004|5/2004|6/2004|7/2004|8/2004|9/2004|10/2004|11/2004|12/2004|1/2005|2/2005|3/2005|4/2005|5/2005|6/2005|7/2005|8/2005|9/2005|10/2005|11/2005|12/2005|1/2006|2/2006|3/2006|4/2006|5/2006|6/2006|7/2006|8/2006|9/2006|10/2006|11/2006|12/2006|1/2007|2/2007|3/2007|4/2007|5/2007|6/2007|7/2007|8/2007|9/2007|10/2007|11/2007|12/2007|1/2008|2/2008|3/2008|4/2008|5/2008|6/2008|7/2008|8/2008|9/2008|10/2008|11/2008|12/2008|1/2009|2/2009|3/2009|4/2009|5/2009|6/2009|7/2009|8/2009|9/2009|10/2009|11/2009|12/2009|1/2010|2/2010|3/2010|4/2010|5/2010|6/2010|7/2010|8/2010|9/2010|10/2010|11/2010|12/2010|1/2011|2/2011|3/2011|4/2011|5/2011|6/2011|7/2011|8/2011|9/2011|10/2011|11/2011|12/2011|1/2012|2/2012|3/2012|4/2012|5/2012|6/2012|7/2012|8/2012|9/2012|10/2012|11/2012|12/2012|1/2013|2/2013|3/2013|4/2013|5/2013|6/2013|7/2013|8/2013|9/2013|10/2013|11/2013|12/2013|1/2014|2/2014|3/2014|4/2014|5/2014|6/2014|7/2014|8/2014|9/2014|10/2014|11/2014|12/2014|1/2015|2/2015|3/2015|4/2015|5/2015|6/2015|7/2015|8/2015|9/2015|10/2015|11/2015|12/2015|1/2016|2/2016|3/2016|4/2016|5/2016|6/2016|7/2016|8/2016|9/2016|10/2016|11/2016|12/2016|1/2017|2/2017|3/2017|4/2017|5/2017|6/2017|7/2017|8/2017|9/2017|10/2017|11/2017|12/2017|1/2018|2/2018|3/2018|4/2018|5/2018|6/2018|7/2018|8/2018|9/2018|10/2018|11/2018|12/2018|1/2019|2/2019|3/2019|4/2019|5/2019|6/2019|7/2019|8/2019|9/2019|10/2019|11/2019|12/2019|1/2020|2/2020|3/2020|4/2020|5/2020|6/2020|7/2020|8/2020|9/2020|10/2020|11/2020|12/2020|1/2021|2/2021|3/2021|4/2021|5/2021|6/2021|7/2021|8/2021|9/2021|10/2021|11/2021|12/2021|1/2022|2/2022|3/2022|4/2022|5/2022|6/2022|7/2022|8/2022|9/2022|10/2022|11/2022|12/2022|1/2023|2/2023|3/2023|4/2023|5/2023|6/2023| checksum|
+------+-------------+--------------------+------+--------------------+---+-------+-------------------+----+---+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+--------------------+
| 00001| 0001|King County Depar...|Active|Full Reporter: Op...| 14| 80389|Seattle--Tacoma, WA| DR| PT| Bus| 574| 574| 574| 574| 574| 574| 574| 237| 574| 574| 574| 574| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 434| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 379| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 445| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 355| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 347| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 336| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 313| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 323| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 321| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 174| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 320| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 304| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 327| 215| 217| 219| 212| 227| 230| 227| 225| 220| 220| 218| 225| 199| 211| 208| 211| 212| 211| 208| 202| 205| 217| 209| 216| 217| 217| 217| 217| 217| 217|66ade520fcab6a774...|
+------+-------------+--------------------+------+--------------------+---+-------+-------------------+----+---+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-------+-------+-------+------+------+------+------+------+------+--------------------+
only showing top 1 row
in RidershipMasterTabDF
/**
* Builds the RidershipMasterTabDF from an Excel file located at the provided path.
* It also adds a checksum column to the resulting data frame.
*
* @param path The path to the Excel file.
* @param headers A sequence of XLS column headers to be used when constructing the DataFrame schema.
* @param spark Implicit SparkSession instance for data processing.
* @return An instance of RidershipMasterTabDF with the added checksum column.
*/
def buildDF(path: String)(implicit spark: SparkSession): RidershipMasterTabDF = {
// Create RidershipMasterTabDF from the given path
val ridershipMasterTabDF = xlsFromPath(path)
// Add the checksum column to the dataframe and return the updated RidershipMasterTabDF
ridershipMasterTabDF
.withChecksumColumn
}
Now we have checksum columns for both master tab and all other tabs we’re interested in.
We can see the data is duplicated across both, so let’s just use the master as the master list.
To do this we will need to drop the non-temporal columns.
Do this we can create a method in the classTimeSeriesDataTabDF
/**
* Removes columns that aren't relevant to the time series analysis.
*
* @return Instance of TimeSeriesDataTabDF with irrelevant columns dropped.
*/
def withOnlyTemporalColumns: TimeSeriesDataTabDF = {
val tabsToDrop = FieldTransformationDefinitions.FieldDefinitions
.filter(_.isShared)
.filter(_.isFeature)
.map(_.excelTabName)
val columnsToDrop = df.columns.toSeq.filter(tabsToDrop.contains(_))
new TimeSeriesDataTabDF(
df.drop(columnsToDrop: _*)
)
}
and updating buildDF
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
val builder = new TimeSeriesDataTabDF(dfForTab)
.withChecksumColumn
.withOnlyTemporalColumns
builder
.dataframe().show(1)
}
}
| 1/2002| 2/2002| 3/2002| 4/2002| 5/2002| 6/2002| 7/2002| 8/2002| 9/2002|10/2002|11/2002|12/2002| 1/2003| 2/2003| 3/2003| 4/2003| 5/2003| 6/2003| 7/2003| 8/2003| 9/2003|10/2003|11/2003|12/2003| 1/2004| 2/2004| 3/2004| 4/2004| 5/2004| 6/2004| 7/2004| 8/2004| 9/2004|10/2004|11/2004|12/2004| 1/2005| 2/2005| 3/2005| 4/2005| 5/2005| 6/2005| 7/2005| 8/2005| 9/2005|10/2005|11/2005|12/2005| 1/2006| 2/2006| 3/2006| 4/2006| 5/2006| 6/2006| 7/2006| 8/2006| 9/2006|10/2006|11/2006|12/2006| 1/2007| 2/2007| 3/2007| 4/2007| 5/2007| 6/2007| 7/2007| 8/2007| 9/2007|10/2007|11/2007|12/2007| 1/2008| 2/2008| 3/2008| 4/2008| 5/2008| 6/2008| 7/2008| 8/2008| 9/2008|10/2008|11/2008|12/2008| 1/2009| 2/2009| 3/2009| 4/2009| 5/2009| 6/2009| 7/2009| 8/2009| 9/2009|10/2009|11/2009|12/2009| 1/2010| 2/2010| 3/2010| 4/2010| 5/2010| 6/2010| 7/2010| 8/2010| 9/2010|10/2010|11/2010|12/2010| 1/2011| 2/2011| 3/2011| 4/2011| 5/2011| 6/2011| 7/2011| 8/2011| 9/2011|10/2011|11/2011|12/2011| 1/2012| 2/2012| 3/2012| 4/2012| 5/2012| 6/2012| 7/2012| 8/2012| 9/2012|10/2012|11/2012|12/2012| 1/2013| 2/2013| 3/2013| 4/2013| 5/2013| 6/2013| 7/2013| 8/2013| 9/2013|10/2013|11/2013|12/2013| 1/2014| 2/2014| 3/2014| 4/2014| 5/2014| 6/2014| 7/2014| 8/2014| 9/2014|10/2014|11/2014|12/2014| 1/2015| 2/2015| 3/2015| 4/2015| 5/2015| 6/2015| 7/2015| 8/2015| 9/2015|10/2015|11/2015|12/2015| 1/2016| 2/2016| 3/2016| 4/2016| 5/2016| 6/2016| 7/2016| 8/2016| 9/2016|10/2016|11/2016|12/2016| 1/2017| 2/2017| 3/2017| 4/2017| 5/2017| 6/2017| 7/2017| 8/2017| 9/2017|10/2017|11/2017|12/2017| 1/2018| 2/2018| 3/2018| 4/2018| 5/2018| 6/2018| 7/2018| 8/2018| 9/2018|10/2018|11/2018|12/2018| 1/2019| 2/2019| 3/2019| 4/2019| 5/2019| 6/2019| 7/2019| 8/2019| 9/2019|10/2019|11/2019|12/2019| 1/2020| 2/2020| 3/2020| 4/2020| 5/2020| 6/2020| 7/2020| 8/2020| 9/2020|10/2020|11/2020|12/2020| 1/2021| 2/2021| 3/2021| 4/2021| 5/2021| 6/2021| 7/2021| 8/2021| 9/2021|10/2021|11/2021|12/2021| 1/2022| 2/2022| 3/2022| 4/2022| 5/2022| 6/2022| 7/2022| 8/2022| 9/2022|10/2022|11/2022|12/2022| 1/2023| 2/2023| 3/2023| 4/2023| 5/2023| 6/2023| checksum|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|746,158|656,324|726,578|736,975|746,158|696,633|746,158|747,339|695,451|766,920|696,633|725,396|761,210|697,454|777,948|795,216|787,912|748,855|793,074|754,939|752,394|843,825|701,931|728,026|747,628|771,845|883,913|857,232|841,169|847,193|825,119|820,359|806,457|829,771|895,134|905,623|774,351|753,949|848,372|821,603|886,438|862,034|843,133|905,209|845,207|865,101|812,467|797,799|798,647|754,978|870,726|811,068|869,933|862,697|864,720|940,351|882,333|895,883|765,935|728,139|789,831|782,401|872,267|864,049|963,309|846,949|848,425|891,059|799,670|921,386|829,419|820,341|746,766|735,504|760,462|797,155|794,368|749,376|793,643|750,790|745,579|848,123|714,143|737,948|841,743|774,171|853,798|851,673|816,168|828,250|855,102|832,296|832,413|897,303|782,787|809,271|778,833|753,302|867,834|836,731|808,656|810,494|812,026|824,703|816,410|878,178|763,045|816,721|835,425|780,800|922,064|863,560|877,355|858,632|807,759|866,152|821,033|849,680|800,887|752,367|708,973|814,776|867,424|833,241|882,908|809,746|812,980|851,610|767,559|873,579|794,750|741,422|823,920|778,181|832,695|854,336|872,258|788,483|825,955|819,781|760,267|862,747|745,981|705,354|768,910|688,770|748,916|781,409|761,544|714,233|726,415|702,592|693,406|776,224|654,371|658,414|585,118|558,914|599,948|587,702|574,770|569,771|562,844|547,513|528,987|571,210|502,916|515,948|628,731|657,129|706,423|686,973|681,681|668,495|633,077|695,930|655,110|681,981|641,698|596,968|635,089|578,776|701,185|657,496|693,832|662,008|632,230|682,834|613,870|689,045|644,004|611,734|695,625|634,643|707,393|699,070|735,695|686,858|696,654|734,015|649,497|753,757|686,471|656,197|722,368|503,211|718,918|741,892|760,626|702,880|749,261|751,070|702,267|789,753|691,308|678,059|597,807|592,710|330,325|259,298|181,764|183,951|336,716|263,311|278,534|364,814|259,950|264,006|257,809|238,318|294,657|298,143|304,492|365,419|376,481|376,779|373,316|390,184|394,087|353,114|361,061|373,994|422,895|419,149|460,406|450,482|435,097|466,737|430,989|427,958|393,024|326,840|427,579|395,828|465,783|444,099|494,563|384,355|66ade520fcab6a774...|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
Great! well we still need to transpose this.
Step Six
Which we can do with a spark sql function stack.
We will filter on date columns and then create new rows for each column, with each value as a column for the tab name
in TimeseriesDataTabDF.scala
/**
* Transforms columns of date data into rows with the format: date, value.
*
* @return Instance of TimeSeriesDataTabDF with reshaped data.
*/
def withColumnsMelted(valueColumnName: String): TimeSeriesDataTabDF = {
val dateColumns = df.columns.filterNot(colName =>
FieldDefinitions.filter(_.isShared).map(_.excelTabName).contains(colName) || colName == "checksum"
)
val stackExpr = dateColumns.map(col => s"'$col', `$col`").mkString(", ")
new TimeSeriesDataTabDF(
df.selectExpr(
"checksum",
s"stack(${dateColumns.length}, $stackExpr) as (date, ${valueColumnName.toLowerCase})"
)
)
}
and in the companion object
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
val builder = new TimeSeriesDataTabDF(dfForTab)
.withChecksumColumn
.withOnlyTemporalColumns
.withColumnsMelted(tabName)
builder
.dataframe().show(1)
}
}
+--------------------+------+-------+
| checksum| date| upt|
+--------------------+------+-------+
|66ade520fcab6a774...|1/2002|135,144|
+--------------------+------+-------+
only showing top 1 row
+--------------------+------+-------+
| checksum| date| vrm|
+--------------------+------+-------+
|66ade520fcab6a774...|1/2002|746,158|
+--------------------+------+-------+
only showing top 1 row
+--------------------+------+------+
| checksum| date| vrh|
+--------------------+------+------+
|66ade520fcab6a774...|1/2002|53,306|
+--------------------+------+------+
only showing top 1 row
+--------------------+------+----+
| checksum| date|voms|
+--------------------+------+----+
|66ade520fcab6a774...|1/2002| 574|
+--------------------+------+----+
only showing top 1 row
Ok, now we will need to get these multiple dataframes into one.
we can update the buildDF method
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
val builder = new TimeSeriesDataTabDF(dfForTab)
.withChecksumColumn
.withOnlyTemporalColumns
.withColumnsMelted(tabName)
// Return the dataframe representation of the processed data.
builder.dataframe()
}
// Merge all processed dataframes into one, using "checksum" and "date" as joining keys.
val reduced = dfTabs.reduce((l, r) => l.join(r, Seq("checksum", "date")))
reduced.show(5)
}
+--------------------+-------+-----+------+---+----+
| checksum| date| upt| vrm|vrh|voms|
+--------------------+-------+-----+------+---+----+
|00241f6dd9406058a...| 1/2012| 0| 0| 0| 0|
|00241f6dd9406058a...| 1/2016| 0| 0| 0| 0|
|00241f6dd9406058a...| 1/2017| 0| 0| 0| 0|
|00241f6dd9406058a...|10/2002|2,766|12,472|878| 9|
|00241f6dd9406058a...|10/2011| 0| 0| 0| 0|
+--------------------+-------+-----+------+---+----+
only showing top 5 rows
You will see that while reduced to one dataframe, we still have duplicate dates and checksum, with only a value for each column, which we will need to aggregate
def buildDF(path: String, targetTabs: Seq[String])(implicit spark: SparkSession) = {
// Process each target tab and return a sequence of dataframes.
val dfTabs = targetTabs.map { tabName =>
val dfForTab = xlsFromPath(path, tabName)
val builder = new TimeSeriesDataTabDF(dfForTab)
.withChecksumColumn
.withOnlyTemporalColumns
.withColumnsMelted(tabName)
// Return the dataframe representation of the processed data.
builder.dataframe()
}
// Merge all processed dataframes into one, using "checksum" and "date" as joining keys.
val reduced = dfTabs.reduce((l, r) => l.join(r, Seq("checksum", "date")))
// Generate dynamic aggregation based on the provided column list
val aggregations = targetTabs.map(colName => sum(colName).as(colName))
val grouped = reduced.groupBy("date", "checksum").agg(aggregations.head, aggregations.tail: _*)
grouped
}
after aggregating
+------+--------------------+---+---+---+----+
| date| checksum|UPT|VRM|VRH|VOMS|
+------+--------------------+---+---+---+----+
|1/2012|00241f6dd9406058a...|0.0|0.0|0.0| 0.0|
+------+--------------------+---+---+---+----+
only showing top 1 row
Step 6. Joining Two DF
JoinedTabsDF
package com.ganzekarte.examples.steps.step_6
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* The object `JoinedTabsDF` provides utility functions to join the dataframes
* from `RidershipMasterTabDF` and `TimeSeriesDataTabDF`.
*/
object JoinedTabsDF {
/**
* Joins the dataframes from RidershipMasterTabDF and TimeSeriesDataTabDF based on the 'checksum' column.
* Filters out rows where columns UPT, VRM, VRH, and VOMS are all zero.
*
* @param masterTabDF Instance of RidershipMasterTabDF containing the master data.
* @param timeSeriesDataTabDF Instance of TimeSeriesDataTabDF containing time series data.
* @return A new instance of JoinedTabsDF containing the joined dataframe.
*/
def fromMasterTimeSeriesDFPair(masterTabDF: RidershipMasterTabDF,
timeSeriesDataTabDF: TimeSeriesDataTabDF): JoinedTabsDF = {
// Filter and join the dataframes
val joinedTabsDF = timeSeriesDataTabDF
.dataframe()
.join(masterTabDF.dataframe(), Seq("checksum"))
// Create an instance of JoinedTabsDF and display the first row of the joined dataframe
val joined = new JoinedTabsDF(joinedTabsDF)
.withSanitizedColumns
joined
}
/**
* Helper function to sanitize column names:
* Replaces spaces with underscores and converts to lowercase.
*
* @param name Original column name.
* @return Sanitized column name.
*/
def sanitizeColumnNames(name: String): String = {
name.replaceAll("\\s+", "_").toLowerCase()
}
}
/**
* Represents a joined dataframe from RidershipMasterTabDF and TimeSeriesDataTabDF.
*
* @param df Underlying dataframe that this class wraps around.
*/
class JoinedTabsDF(df: DataFrame) {
/**
* Provides access to the underlying data frame.
*
* @return The underlying DataFrame.
*/
def dataframe(): DataFrame = df
def withSanitizedColumns: JoinedTabsDF = {
val sanitized = df.columns.foldLeft(df) { (updatedDF, columnName) =>
updatedDF.withColumnRenamed(columnName, JoinedTabsDF.sanitizeColumnNames(columnName))
}
new JoinedTabsDF(sanitized)
}
}
You may need to tweak the spark excel readers settings with
maxRowsInMemory = 100000000,
maxByteArraySize = 100000000,
Here we are joining the dataframes together.
+--------------------+-------+----+----+-----+----+------+-------------+--------------------+----+---+------+--------+--------------------+-----------------+---------+--------+---+-------+-------------+------------+--------------+-----------------------+---------------------+-----------------------+------------+-----------+------------------+---------------------------+------------------+--------+---------------------+--------------------+---------------------+
| checksum| date| upt| vrm| vrh|voms|ntd_id|legacy_ntd_id| agency|mode|tos|3_mode| status| reporter_type|organization_type| hq_city|hq_state|uza|uace_cd| uza_name|uza_sq_miles|uza_population|service_area_population|service_area_sq_miles|most_recent_report_year|fy_end_month|fy_end_year|passenger_miles_fy|unlinked_passenger_trips_fy|avg_trip_length_fy|fares_fy|operating_expenses_fy|avg_cost_per_trip_fy|avg_fares_per_trip_fy|
+--------------------+-------+----+----+-----+----+------+-------------+--------------------+----+---+------+--------+--------------------+-----------------+---------+--------+---+-------+-------------+------------+--------------+-----------------------+---------------------+-----------------------+------------+-----------+------------------+---------------------------+------------------+--------+---------------------+--------------------+---------------------+
|00241f6dd9406058a...| 1/2012| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...| 1/2016| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...| 1/2017| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|10/2002|null|null|878.0| 9.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|10/2011| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|10/2020| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|11/2002|null|null|800.0| 9.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|11/2009| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|12/2010| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
|00241f6dd9406058a...|12/2020| 0.0| 0.0| 0.0| 0.0| 90180| 9180|Coconino County T...| DR| DO| Bus|Inactive|Full Reporter: Op...| ?|Flagstaff| AZ|403| 29818|Flagstaff, AZ| 32.0| 57050.0| 0.0| 0.0| 2008| 6| 2008| 128989.0| 22349.0| 5.7715783| 31234.0| 746762.0| 33.413666| 1.3975569|
+--------------------+-------+----+----+-----+----+------+-------------+--------------------+----+---+------+--------+--------------------+-----------------+---------+--------+---+-------+-------------+------------+--------------+-----------------------+---------------------+-----------------------+------------+-----------+------------------+---------------------------+------------------+--------+---------------------+--------------------+---------------------+
only showing top 10 rows
Summary
And that wraps up the second installment of our series! In this part:
- We extracted data from the National Transit Database's XLS file.
- We transformed this data into three distinct dataframes.
- We further refined the data to align with conventional data processing methods, priming it for subsequent steps.
Stay connected for the next segment in our exciting journey.