Data Engineering with SnowFlake and Amazon Sales Data: Part 2
Continuing our exploration of an end-to-end ETL (Extract, Transform, Load) data pipeline for Amazon Sales data using Snowpark
and Snowflake
. In this part we will focus on working on Data Ingestion
and Staging
Checkout Part 1 of this series.
Where to find complete resources for the series?
The complete resources including code and worksheets are available on my github repo
1. Creating Database and Schema
Now that we have setup our working environment in Part 1, We will now setup our database and schema for the development.
you can find the complete worksheet for this step here
1. Creating Database
snowflake has an easy command for creating a new database as follows.
create database if not exists amazon_sales_dwh;
2. Creating Schemas (Layers)
In this step we will create different schemas within our database to carry out different level for operations on the data. This includes different levels of cleaning and massaging of the data starting from source layer
to the final consumption layer
.
Snowflake Schema: A schema
in snowflake is bascially a logical separation
of storage within a database. This storage consists of tables, views, file-formats, functions etc. This helps us to create different layers for single project within same database instead of creating multiple different databases.
use database amazon_sales_dwh;
create schema if not exists source; -- will have source, data stage here (source layer)
create schema if not exists curated; -- data curation and de-duplication (curation layer)
create schema if not exists consumption; -- fact & dimension (consumption layer)
create schema if not exists common; -- for file formats sequence object etc (common objects among layers)
2. Staging Files and Data ingestion
For ingesting the data from different files into our tables we first need to create stage for the data, Snowflake allows internal and external stage (S3 etc.). In this demo we will be using internal stage just to keep things simple and free.
Snowflake Stage: A stage is basically an area where data files are stored ( “staged”) so that the data in the files can be loaded into a table.
1. Creating Stage
-- Create Stage (Internal)
use schema source;
create or replace stage data_stg;
2. Grant Permissions
This step is important as we first need to grant permissions to new objects we created to the defined role to be used which in our case is sysadmin
.
Note: Skipping this step can cause error while pushing data to the stage.
-- Grant permissions for all items to sysadmin (you can create and use different role too)
-- It is not necessary to provide "ALL ON ALL" permission, here I have used just for simplicity.
GRANT ALL ON ALL SCHEMAS IN DATABASE amazon_sales_dwh TO ROLE sysadmin;
GRANT ALL ON ALL TABLES IN SCHEMA amazon_sales_dwh.source TO ROLE sysadmin;
GRANT ALL ON ALL SEQUENCES IN SCHEMA amazon_sales_dwh.source TO ROLE sysadmin;
GRANT ALL PRIVILEGES ON STAGE data_stg TO sysadmin;
3. Uploading data to stage (Data Ingestion)
Now that we have created internal stage
and granted all the necessary permissions, We can now upload our data to the stage. For this we will be using a Python Script that uses SnowFlake's SnowPark
"""
Snowflake data staging script
"""
import os
import sys
import logging
from snowflake.snowpark import Session
from dotenv import load_dotenv
load_dotenv()
# initiate logging at info level
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%I:%M:%S",
)
# Stage Location
STAGE_LOCATION = "@amazon_sales_dwh.source.data_stg"
# snowpark session
def get_snowpark_session() -> Session:
"""Create Snowpark Session"""
connection_parameters = {
"ACCOUNT": os.environ.get("ACCOUNT_ID"),
"USER": os.environ.get("SF_USER"),
"PASSWORD": os.environ.get("PASSWORD"),
"ROLE": os.environ.get("ROLE"),
"DATABASE": os.environ.get("DATABASE"),
"SCHEMA": os.environ.get("SCHEMA"),
"WAREHOUSE": os.environ.get("WAREHOUSE"),
}
# creating snowflake session object
return Session.builder.configs(connection_parameters).create()
def traverse_directory(
directory: str, file_extension: str
) -> tuple[list[str], list[str], list[str]]:
"""Traverse the directory to get filename, partition and local file path.
Args:
directory (str): The path of the directory to traverse.
file_extension (str): The file extension to filter files.
Returns:
Tuple[List[str], List[str], List[str]]: A tuple containing three lists:
- A list of filenames with the specified extension.
- A list of partition directories relative to the base directory.
- A list of local file paths corresponding to the filenames.
"""
local_file_path = []
file_name = [] # List to store file paths
partition_dir = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(file_extension):
file_path = os.path.join(root, file)
file_name.append(file)
partition_dir.append(root.replace(directory, ""))
local_file_path.append(file_path)
return file_name, partition_dir, local_file_path
def upload_files(
session: Session,
file_names: list[str],
partition_dirs: list[str],
local_file_paths: list[str],
stage_location: str,
) -> None:
"""Upload files to the Snowflake stage.
Args:
session (SnowflakeConnection): An active Snowflake session.
file_names (List[str]): A list of filenames to be uploaded.
partition_dirs (List[str]): A list of partition directories corresponding to the files.
local_file_paths (List[str]): A list of local file paths for the files to be uploaded.
stage_location (str): The target stage location in Snowflake.
Returns:
None
"""
for index, file_element in enumerate(file_names):
try:
put_result = session.file.put(
local_file_paths[index],
f"{stage_location}/{partition_dirs[index]}",
auto_compress=False,
overwrite=True,
parallel=10,
)
logging.info("Uploaded %s => %s", file_element, put_result[0].status)
except Exception as e:
logging.error("Error uploading %s: %s", file_element, e)
def main():
"""Entrypoint"""
session = get_snowpark_session()
try:
logging.info("Session created successfully")
# Specify the directory path to traverse
directory_path = "./amazon-sales-data/"
# Traverse directory for CSV, Parquet, and JSON files
csv_file_name, csv_partition_dir, csv_local_file_path = traverse_directory(
directory_path, ".csv"
)
parquet_file_name, parquet_partition_dir, parquet_local_file_path = (
traverse_directory(directory_path, ".parquet")
)
json_file_name, json_partition_dir, json_local_file_path = traverse_directory(
directory_path, ".json"
)
# Upload files to Snowflake stage
upload_files(
session,
csv_file_name,
csv_partition_dir,
csv_local_file_path,
STAGE_LOCATION,
)
upload_files(
session,
parquet_file_name,
parquet_partition_dir,
parquet_local_file_path,
STAGE_LOCATION,
)
upload_files(
session,
json_file_name,
json_partition_dir,
json_local_file_path,
STAGE_LOCATION,
)
except Exception as e:
logging.error("An error occurred: %s", e)
finally:
if session:
session.close()
logging.info("Session closed")
if __name__ == "__main__":
main()
If the script outputs correctly then data staging is successful 🎉. Feel free to comment if you face any issue while running the script.
You can also verify uploaded files by listing the staged files.
-- List stage files
use schema source;
list @data_stg;
Follow the next step here