In any organization that depends on continuous batches of data for the purposes of decision-making analytics, it becomes super important to streamline and automate data processing workflows. Larger teams will usually consist of a Data Architect who carefully creates the blueprints of the data infrastructure, a Data Engineer who dives into the code to build out the data infrastructure, a Data Analyst who gathers and assesses the data needs across different functional teams and ensures the reliability of the data, and a Data Scientist who uses the data to create business value through machine learning. For a data science team to work cohesively together, I think it is super important that every person on the team has some knowledge of another data member’s role and functions. I also think this is the best way to really elevate yourself as a team player and become a well-rounded data professional.

In this blog post, I aim to demonstrate how a Data Scientist can expand their data engineering knowledge and skills through creating simple data pipelines using Apache Airflow. In addition to Airflow, this post includes Amazon S3, Snowflake and Slack as part of the technology stack to demonstrate how fruitful a Data Scientist’s toolkit can be. I hope to present how awesome and powerful these tools can be to better your data products and data science projects.

This demonstration assumes that you have the following:
1. An Amazon S3 account with credentials (access key ID and secret access key) for the appropriate buckets
2. A Snowflake account with credentials (username, password, and account name) that have read and write access to the data warehouse
3. A Slack account with credentials (application token, Slack generated user code, Slack password) to set-up alerts and notifications via API
4. Apache Airflow and its dependencies fully installed, properly installed and running (whether on your local computer for practice or a virtual machine in production)
5. Working knowledge of directed-acyclic graphs (DAG)
5. Python[3.6 or later] installed and working knowledge of Python scripting

Required Python Modules

import requests
import json
import snowflake.connector
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators.slack_operator import SlackAPIPostOperator

These Python modules are required to successfully run the Airflow script. Use pip to download the Airflow module and Snowflake Connector for the Snowflake modules if you do not already have them installed inside of Python.

Declaring Account Credential Variables

# credentials
database_name = 'MY_DATABASE'
table_name = 'MY_TABLE'
bucket_name = 'MY_BUCKET_NAME'
bucket_key = 'MY_BUCKET_KEY'
snowflake_username = BaseHook.get_connection('snowflake').login
snowflake_password = BaseHook.get_connection('snowflake').password
snowflake_account = BaseHook.get_connection('snowflake').host
aws_s3_access_key_id = BaseHook.get_connection('aws_s3_access_key_id').password
aws_s3_secret_access_key = BaseHook.get_connection('aws_s3_secret_access_key').password
slack_username = BaseHook.get_connection('slack').login
slack_token = BaseHook.get_connection('slack').password
slack_user_code = 'MY_SLACK_USER_CODE'

One of the most important things that we need to take into account is storing and accessing private information such as account passwords. Luckily, Airflow has the capability to securely store and access this information. Account credentials where the security does not really matter can be placed in the Python script as shown above where it says ‘MY_…’. All other account credentials whose information needs to be private and secure will have to be included in the Airflow UI.

In the Airflow UI, at the top, click on the Admin tab and then click Connections. You should see the following on your screen:

airflow_connections

From the Python code above, notice how there are BaseHook variables calling the get_connection method for specific connections. The get_connection method accepts the Conn Id as a parameter, and has further methods of hostschemaloginpasswordport, and extra. The Python code uses a variety of  these methods to call on credentials. This process is the most advised and secure way of ensuring privacy when implementing data pipelines in production.

Other Useful Variable Declarations

# dates
today = datetime.today()
yesterdate = today + timedelta(days = -1)
yesterday = yesterdate.strftime('%Y-%m-%d')

These are some simple date variables that will be used descriptively in scripts throughout this blog post.

Airflow Task: Upload to Snowflake

def upload_to_snowflake():
con = snowflake.connector.connect(user = snowflake_username, password = snowflake_password, account = snowflake_account)

cs = con.cursor()
cs.execute('USE DATABASE %s;' % database_name)

copy = (
"COPY into %s"
" from s3://%s/%s"
" credentials = (aws_key_id = '%s' aws_secret_key = '%s')"
" file_format = (type = csv field_delimiter = ','"
" field_optionally_enclosed_by = '\"'"
" skip_header = 1)"
" on_error = 'continue';"
% (table_name, bucket_name, bucket_key, aws_s3_access_key_id, aws_s3_secret_access_key)
)

cs.execute(copy)
cs.close()

This Python function defines an Airflow task that uses Snowflake credentials to gain access to the data warehouse and the Amazon S3 credentials to grant permission for Snowflake to ingest and store csv data sitting in the bucket.

A connection is created with the variable cs, a statement is executed to ensure we are using the right database, a variable copy describes a string that is passed to Snowflake with instructions to copy data from S3 which is then executed. Refer to this guide for more details on how to format the copy string or command. The database connection is then closed after a successful function call.

Airflow Task: Successful DAG Run Slack Notification

def completion_slack_message():
message = (
":white_check_mark: *Daily Update* \n"
"*Date*: %s \n"
"*Warehouse*: SNOWFLAKE \n"
"*Database*: %s \n"
"*Table*: %s \n"
"*Alert*: `Successfully uploaded`"
% (yesterday, database_name, table_name)
)

post = {"text": "{0}".format(message)}
url = BaseHook.get_connection('slack').login

requests.post(url, data = json.dumps(post), headers = {'Content-Type': 'application/json'})

This simple Python function sends a message to Slack once the DAG has successfully run through all its’ tasks. The contents of the Slack message are purely descriptive and are chosen to help identify what specific areas in the data pipeline are being processed successfully.

Airflow Failure: Unsuccessful DAG Task Slack Alert

def failure_slack_message(context):

slack_alert = SlackAPIPostOperator(
task_id = 'task_failure_slack_message',
channel = "#data_notifications",
token = slack_token,
username = slack_username,
text = (
":heavy_exclamation_mark: *Daily Update* \n"
"*Date*: %s \n"
"*Alert*: `DAG: %s, Task: %s`"
% (yesterday, context.get('task_instance').dag_id,context.get('task_instance').task_id)
)
)

This simple Python function sends a message to Slack if the DAG fails to run one of its tasks. The function takes context as a parameter which contains details of the task instance and its’ details that failed. Notice that the Slack channel in which all alerts will be sent to is identified as #data_notifications. The slack_token and slack_username is required to give the Slack app permission to send the message.

Implementing the DAG

DAG_DEFAULT_ARGS = {'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.utcnow(), 'retries': 1, 'retry_delay': timedelta(minutes=5)}

with DAG(dag_id = "s3_snowflake_slack_pipeline",
default_args = DAG_DEFAULT_ARGS,
schedule_interval = "0 8 * * *",
catchup = False) as dag:

file_sensor = S3KeySensor(task_id = 's3_key_sensor_task',
poke_interval = 60 * 30,
timeout = 60 * 60 * 12,
bucket_key = "s3://%s/%s" % (bucket_name, bucket_key),
bucket_name = None,
wildcard_match = False,
on_failure_callback = failure_slack_message,
dag = dag
)

upload_file = PythonOperator(task_id = "upload_to_snowflake_task",
python_callable = upload_to_snowflake,
on_failure_callback = failure_slack_message,
dag = dag
)

completion_slack_message = PythonOperator(task_id = "completion_slack_message_task",
python_callable = completion_slack_message,
on_failure_callback = failure_slack_message,
dag = dag
)

file_sensor >> upload_file >> completion_slack_message

Notice how some key parts of the DAG are the schedule interval, which is set to run everyday at 8:00am UTC.

This DAG first uses the sensor S3KeySensor to detect whether a new file with our specified bucket key exists within the S3 bucket. If the appropriate file is not detected after a series of pokes within the time interval and finally reaches a time out, then a Slack error notification will be fired via the Python function failure_slack_message. If the appropriate file is detected, then the next task, upload_file will identify this as a successful run and continue to process the csv data from S3 into Snowflake using upload_to_snowflake. If the upload is not successful, the DAG will fire another Slack error notification. Once Snowflake successfully ingests this S3 data, a final Slack message is sent via completion_slack_message to notify end users that the pipeline was processed successfully.

Closing Comments

This demonstration utilized Airflow to organize, schedule and monitor a data pipeline using Amazon S3 csv files to a Snowflake data warehouse. Slack was used as a way to create notifications and alerts for a more robust data processing workflow.

Although a simple data pipeline was presented here, much larger data projects will require much more complex data pipelines. Airflow has the capacity to create very complex DAGs depending on the data needs of the organization. It is very possible that the exact same sourced data will need to be joined with other complex data, transformed and loaded into a different number of tables to reach varying teams and end-users.

Wherever data flows within the organization, I think it is important to have all members of your data science team share the responsibility of upholding data integrity, keeping each other accountable for their roles and most importantly, learning from one another and elevating each other to do practice great data science.