Let's say we have a requirement wherein we need to get information regarding orders placed and their status.
To make it simple we would be working with a very small static data, source data would be a CSV file with few rows.
Process would be (10,000 ft overview) :
Tech stack:
PySpark ( for loading of data from one source to another).
DBT ( For ETL and scheduling jobs).
LOOKER (For reporting/visualisation).
STEP 1: LOADING DATA
Let's say we have below data in a CSV file called data.csv
Commands used:
# Read the CSV file with the defined schema
csv_file_path = "/Users/jatin/spark/data.csv"
df = spark.read.csv(csv_file_path, header=True)
# Save the DataFrame as Parquet
parquet_file_path = "/Users/jatin/spark/data.parquet"
df.write.parquet(parquet_file_path)
Before running this command we need to create spark session and import packages:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Create a SparkSession
spark = SparkSession.builder \
.appName("Test App") \
.config("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.13.5,net.snowflake:spark-snowflake_2.12:2.9.0-spark_3.1") \
.getOrCreate()
We are storing/converting data from CSV to Parquet as compression is good in parquet and data is stored in columnar format which would be better in case we do direct reporting on few columns.
STEP2: LOADING DATA (TRANSFORM) TO SNOWFLAKE
Now since we have data in parquet format, which can contain duplicate values or values which are different based on just load_timestamp or modified_ts as in our data.
We can either run few functions in PySpark and load data into snowflake directly which we will do in this case as data is small and static. But for production where we get data every 5-10 mins then in that case we should use DBT to perform ETL and scheduling.
Code to save in snowflake:
# Read the Parquet file into df
parquet_fileU="/Users/jatin/spark/data.parquet/part-00000-449db15a-2cb7-443f-b81a-6aeed1c889dc-c000.snappy.parquet"
df = spark.read.parquet(parquet_file)
# Using window function to partition by order_id and order by modified ts
window_fn = Window.partitionBy("order_id").orderBy(df.modified_ts.desc())
# Adding row number
df_with_row_number = df.withColumn("row_number", row_number().over(window_fn))
# Filter rows with row_number = 1 to get the latest row for each order_id
latest_rows = df_with_row_number.filter(df_with_row_number.row_number == 1)
# Remove last column as not required to load into snowflake
column_names = latest_rows.columns
new_latest=latest_rows.select(column_names[:-1])
# Write the latest rows to Snowflake table
new_latest.write \
.format("net.snowflake.spark.snowflake") \
.options(**sf_options) \
.option("dbtable", "order_details_test") \
.mode("append") \
.save()
Snowflake parameter required to be setup before running above command:
*# Define Snowflake connection parameters
sf_options = {
"sfURL": "https://****.snowflakecomputing.com",
"sfDatabase": "****",
"sfSchema": "**",
"sfWarehouse": "****",
"sfRole": "****",
"sfUser": "****",
"sfPassword": "****"
}
STEP3: VISUALIZATION USING LOOKER
We can create an explore in Looker using above data to show reports to users, add custom images and details:
This is very basic flow of how to get from CSV file to visualisation.
Image credits:
Comentarios