Data Engineering with SnowFlake and Amazon Sales Data
In this series we’ll explore an end-to-end ETL (Extract, Transform, Load) data pipeline for Amazon Sales data using Snowpark
and Snowflake
. We will focus is on efficiently handling Amazon’s sales data, starting from loading(staging) data to creating dashboard. We will go through the journey from data curation where we'll transform and clean the data to modelling the data (dimensional modelling). We will go through each phase in detail, Stay tuned!
Come along as we delve into the key concepts for constructing a robust ETL pipeline with Snowflake.
Where to find complete resources for the series?
The complete resources including code and worksheets are available on my GitHub Repo: here
1. Setting up the environment
First of all, let’s setup our environment to work. This includes creating virtual warehouse, user, role and permissions setup.
you can find the complete worksheet for this step here
1. Creating Virtual Warehouse
here we will be creating a standard
medium
sized warehouse for our sample project which will be free for trail account.
use role sysadmin;
create warehouse amazon_sales
with
warehouse_size = 'medium'
warehouse_type = 'standard'
auto_suspend = 60
auto_resume = true
min_cluster_count = 1
max_cluster_count = 1;
2. Creating User
use role accountadmin;
create user amazon_sales_user
password = 'Test@123#'
default_role = sysadmin
default_secondary_roles = ('ALL')
must_change_password = false;
3. Granting permissions
Note: you don’t have to use role
sysadmin
, you can create a separate role and use it. But here I have usedsysadmin
for simplicity.
grant role sysadmin to user amazon_sales_user;
grant USAGE on warehouse amazon_sales to role sysadmin;
4. Connecting to Snowflake with Python using Snowpark
Let’s write a simple python script to connect to our Snowflake warehouse database.
you can find the script (step_1_check_connection.py) and its dependency here as python project with poetry dependency management.
ACCOUNT_ID="<orgname>-<account_name>"
SF_USER=username
PASSWORD=password
ROLE=SYSADMIN
DATABASE=AMAZON_SALES_DWH
SCHEMA=SOURCE
WAREHOUSE=AMAZON_SALES
"""
Snowflake connection script
"""
import os
import sys
import logging
from snowflake.snowpark import Session
from dotenv import load_dotenv
load_dotenv()
# initiate logging at info level
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%I:%M:%S",
)
# snowpark session
def get_snowpark_session() -> Session:
"""Create Snowpark Session"""
connection_parameters = {
"ACCOUNT": os.environ.get("ACCOUNT_ID"),
"USER": os.environ.get("SF_USER"),
"PASSWORD": os.environ.get("PASSWORD"),
"ROLE": os.environ.get("ROLE"),
"DATABASE": os.environ.get("DATABASE"),
"SCHEMA": os.environ.get("SCHEMA"),
"WAREHOUSE": os.environ.get("WAREHOUSE"),
}
# creating snowflake session object
return Session.builder.configs(connection_parameters).create()
def main():
"""Entrypoint"""
session = get_snowpark_session()
context_df = session.sql(
"""
select
current_role(),
current_database(),
current_schema(),
current_warehouse()
"""
)
context_df.show(2)
customer_df = session.sql(
"""
select
c_custkey,
c_name,
c_phone,
c_mktsegment
from
snowflake_sample_data.tpch_sf1.customer
limit 10
"""
)
customer_df.show(5)
if __name__ == "__main__":
main()
If the script outputs correctly then connection is successful 🎉. Feel free to comment if you face any issue while running the script.
Follow the next step here