## Airflow for hands-off ETL

Almost exactly a year ago, I joined Yahoo, which more recently became Oath.

The team I joined is called the Product Hackers, and we work with large amounts of data. By large amounts I meant, billions of rows of log data.

Our team does both ad-hoc analyses and ongoing machine learning projects. In order to support those efforts, our team had initially written scripts to parse logs and run them with cron to load the data into Redshift on AWS. After a while, it made sense to move to Airflow.

## Do you need Airflow?

1. How much data do you have? A lot
2. Do you use cron jobs for ETL? How many? Too many
3. Do you have to re-run scripts manually when they fail? How often? Yes, often enough to be a pain point
4. Do you use on-call shifts to help maintain infrastructure? Unfortunately, we did

## What exactly is Airflow?

Airflow is an open-source python library. It creates Directed Acyclic Graphs (DAGs) for extracting, transforming, and loading data.

'Directed' just means the tasks are supposed to be executed in order (although this is not actually required, tasks don't even have to be connected to each other and they'll still run). 'Acyclic' means tasks are not connected in a circle (although you can write loops that generate tasks dynamically, you still don't want circular dependencies).

A DAG in this case is a python object made up of tasks. A typical DAG might contain tasks that do the kinds of things you might do with cron jobs:

get logs --> parse logs --> create table in database --> load log data into new table

Each DAG step is executed by an Operator (also a python object).

Operators use Hooks to connect to external resources (like AWS services).

Task instances refer to attempts to run specific tasks, so they have state that you can view in the dashboard. The state tells you whether that tasks is currently executing, succeeded, failed, skipped, or waiting in the queue.

## Some tips if you're going to use Airflow

• ### Make your jobs idempotent if possible

My team has a couple different types of tables that we load into Redshift. One is the type we call 'metadata', which is typically just a simple mapping that doesn't change very often. For this type of table, when it does change, it's important to drop the old table and re-create it from scratch. This is easier to manage with separate tasks for each SQL step, so the DAG has the following steps:

get_data --> drop_old_table --> create_new_table --> load_data

This way, if any of the steps fail, we can re-start from there, and it doesn't matter if the step was partially completed before it failed.

The other kind of table we have is an event table, and those are loaded with fresh data every day. We retain the data for 3 days before we start running out of space on the Redshift cluster. This kind of table doesn't really need a drop_old_table step, because the table name includes the date (which makes it easier to find the table you want when you're running queries). However, when we create these tables, we still want to make sure we don't create duplicates, so in the create step we check to see if the table already exists.

• ### Get AIRFLOW_HOME depending on where you're running

If you want a really stable build that requires the least amount of hands-on maintenance, do yourself a favor and 'productionize' your setup. That means you'll want to run Airflow in at least 3 places:

1. In a virtual environment on your local machine (we use Docker with Ansible)
2. In a virtual environment in your continuous integration system (we use Jenkins)
3. In a virtual environment on your production host (we use virtualenv with python 3)

Note that Airflow doesn't make this easy, so I wrote a little helper script to make sure Airflow has the right configuration files and is able to find the DAGs, both of which are dependent on using the correct AIRFLOW_HOME environment variable.

Here's the TL;DR:

    #If AIRFLOW_HOME environment variable doesn’t exist, it defaults:
os.getenv('AIRFLOW_HOME', '~/airflow')

#It’s really useful to always check where the code is running:
homedir = os.getenv('HOME')

#If it’s on Jenkins, there’s an environment variable that gives you the path for that:
jenkins_path = os.getenv('JENKINS_HOME', None)

#In the Jenkinsfile (without Docker), we’re doing this:
withEnv(['AIRFLOW_HOME=/br/airflow'])
cp -r $PWD/dags$AIRFLOW_HOME/

#If you’re running tests locally, there’s a helper that I stole from inside Airflow’s guts:
import airflow.configuration as conf
os.environ['TEST_CONFIG_FILE'] = conf.TEST_CONFIG_FILE


I hadn't seen anyone doing this for Airflow, but I write tests for all my python code, so why should Airflow be any different?

It's a little unintuitive, because Airflow DAG files are not like regular python files. DAG objects have to be at the top level, so the way I got around this was to grab the dag file and then get each of the task objects as attributes.

I wrote the tests for the Operators so that they could be easily re-used, since most of our DAGs have similar tasks. This also lets us use unit tests to enforce best practices.

class TestPostgresOperators:
"""
Not meant to be used alone
For use within specific dag test file
"""
@classmethod
def setUp(cls, dagfile):
cls.dagfile = dagfile

'''
validate fields here
check retries number

'''
assert(0 <= drop.retries <= 5)
assert(drop.postgres_conn_id=='redshift')


Then these 'abstract tests' get instantiated in the test file for a particular DAG, like this:

import advertisers_v2
from test_dag_instantiation import TestDAGInstantiation
from conftest import unittest_config

from test_postgres_operators import TestPostgresOperators
from test_mysql_to_redshift import TestMySQLtoRedshiftOperator

mydag = TestDAGInstantiation()
mydag.test_dagname()
mydag.test_default_args()

postgres_tests = TestPostgresOperators()
postgres_tests.test_droptable()
postgres_tests.test_createtable()

mysql_to_redshift_tests = TestMySQLtoRedshiftOperator()
mysql_to_redshift_tests.test_importstep()


Doing it this way makes it ridiculously easy to set up tests, and they can still be parameterized however you want, to test customizations as needed.

## Probability Binning: simple and fast is better than complicated and slow

Recently, I've done a few data science coding challenges for job interviews. My favorite ones included a data set and asked me to address both specific and open-ended questions about that data set.

One of the first things I usually do is make a bunch of histograms. Histograms are great because it's an easy way to look at the distribution of data without having to plot every single point, or get distracted by a lot of noise.

A histogram is just a plot with the number of counts per value, where the values are divided into equally-sized bins. In the traditional histogram, the bins are always the same width along the x-axis (along the range of the values). More bins means better resolution. Fewer bins can simplify the representation of a data set, for example if you want to do clustering or classification into a few representative groups.

A histogram with ten bins:

The same data with 3 bins:

Original implementation:

First, I used matplotlib to get the bin ranges, because that was easy. Then I applied those as masks on my original dataframe, to convert the data into categories based on the bin ranges.

    def feature_splitter(df, column, bins=3):
"""
Convert continuous variables into categorical for classification.
:param df: pandas dataframe to use
:param column: str
:param bins: number of bins to use, or list of boundaries if bins should be different sizes
:return: counts (np.array), bin_ranges (np.array), histogram chart (display)
"""
counts, bin_ranges, histogram = plt.hist(df[column], bins=bins)

return counts, bin_ranges, histogram

"""
Use bin_ranges to create categorical column

Assumes 3 bins

:param df: pandas dataframe as reference and target
:param column: reference column (name will be used to create new one)
:param bin_ranges: np.array with ranges, has 1 more number than bins
:return: modified pandas dataframe with categorical column
"""

low = (df[column] >= bin_ranges[0]) & (df[column] < bin_ranges[1])
med = (df[column] >= bin_ranges[1]) & (df[column] < bin_ranges[2])
high = (df[column] >= bin_ranges[2])

df.loc[mask, (column + '_cat')] = i

return df


This worked well enough for a first attempt, but the bins using a traditional histogram didn't always make sense for my purposes, and I was assuming that I'd always be masking with 3 bin ranges.

Then I remembered that there's a different way to do it: choose bin ranges by equalizing the number of events per bin. This means the bin widths might be different, but the height is approximately the same. This is great if you have otherwise really unbalanced classes, like in this extremely simplified example, where a traditional histogram really doesn't always do the best job of capturing the distribution:

When to use probability binning:

Use probability binning when you want a small number of approximately equal classes, defined in a way that makes sense, e.g. combine adjacent bins if they're similar.

It's a way to convert a numeric, non-continuous variable into categories.

For example, let's say you're looking at user data where every row is a separate user. The values of specific column, say "Total clicks" might be numeric, but the users are independent of each other. In this case, what you really want to do is identify categories of users based on their number of clicks. This isn't continuous in the same way as a column that consists of a time series of measurements from a single user.

I used to do this by hand/by eye, which is fine if you don't need to do it very often. But this is a tool that I've found extremely useful, so I wanted to turn it into a reusable module that I could easily import into any project and apply to any column.

The code I wrote is here

The actual process of getting there looked like this:

Step 1: create an inverted index

Step 2: write tests and make sure that's working

Step 3: use plots to verify if it was working as expected (and for comparison with original implementation)

For the simple case yes, but on further testing realized I had to combine bins if there were too many or they were too close together.

Step 4: combine bins

Step 5: use the bin ranges to mask the original dataframe and assign category labels

    def bin_masker(self):
"""
Use bin_ranges from probability binning to create categorical column

Should work for any number of bins > 0

:param self.df: pandas dataframe as reference and target
:param self.feature: reference column name (str) - will be used to create new one
:param self.bin_ranges: sorted list of new bins, as bin ranges [min,   max]
:return: modified pandas dataframe with categorical column
"""

for item in self.bin_ranges:
mask = (self.df[self.feature] >= item[0]) & (self.df[self.feature] < item[1])

self.df.loc[mask, (self.feature + '_cat')] = i
self.df[self.feature + '_cat'].fillna(0, inplace=True) #get the bottom category


Step 6: try it in the machine learning application of my choice (a decision tree - this will go in a separate post). Check the accuracy score on the train-test-split (0.999, looks good enough to me).

Step 7: write more tests, refactor into OOP, write more tests.

Step 8: Add type hints and switch to using a public data set and pytest. Fix some stupid bugs. Write this blog post. Start preparing a package to upload to pypi for easier portability.

## Within every tutorial, is another tutorial

Things I learned while following this tutorial on how to build reusable models with scikit-learn.

1. When in doubt, go back to pandas.
2. When in doubt, write tests.
3. When in doubt, write helper methods to wrap existing objects, rather than creating new objects.

## Ingesting "clean" data is easy, right?

Step 1 of this tutorial began with downloading data using requests, and saving that to a csv file. So I did that. I've used requests before, I had no reason to think it wouldn't work. It looked like it worked.

Step 2 was to read the file into pandas. I've read lots of csv files into pandas before, so I had no reason to think it wouldn't work.

It didn't work.

I double-checked that I had followed the instructions correctly, and then checked a few more times before concluding that something was not quite right about the data.

I went back and did the easy thing, just printing out the response from requests.

After some digging, I figured out that response.content is not the same as response.text.

The tutorial said to use response.content, but response.text seemed to have actually parsed the strings.

Even with that fix, pandas was refusing to read in more than the first row of data, due to a couple of problems:

• pandas wasn't finding the line terminators (nothing special, just '\n')
• pandas wasn't finding equal numbers of items per row

Unexpectedly, when I went back to what I usually do, just plain old pandas.read_csv, this time going directly from the url, and including the column names, that actually worked.

So it was actually better, and a lot less code, to completely skip using requests.

## Testing always gets me unstuck

I really liked the end-to-end structure of this tutorial, and was frankly embarrassed that I had so much trouble getting the initial ingestion to work.

I liked that the tutorial gave me an excuse to walk through how the author actually uses scikit-learn models in production. With the data firmly in hand, the data visualization steps were easy - they worked as advertised, and anyway I'm very familiar with using seaborn to make charts in python.

I had never created a Bunch object before, so that was new for me. That seemed to work, but then the next steps again failed, and I had to back up a few steps.

I wasn't sure what the problem was, so I did what I always do with complicated problems, and wrote some tests to rule out user error and make sure I understood what the code was doing. That helped a lot, and identified what was actually broken.

The problem: how to apply LabelEncoder to help convert categorical data, and Imputer to help fill missing data, to multiple columns.

Because the idea was to do this in the context of a Pipeline object, the author demonstrated how to create our own Encoder and Imputer objects, with multiple inheritance. I understand the goal of this: take advantage of the nice clean syntax you get from making a Pipeline. But it was failing at the fit_transform step, and it wasn't obvious why.

The fit() and transform() steps both seemed to be working individually and sequentially, and it wasn't easy to figure out how the fit_transform step was supposed to do anything more than chain them together.

After banging my head on this at the end of a long day, even going back to the original scikit-learn source code in an effort to design tests to help me figure out what was wrong, I decided to sleep on it.

## Simple and working is better than complicated and broken

I seriously considered writing tests for our custom Encoder and Imputer objects, but then it dawned on me that I really didn't need to do that. I decided that the Pipeline functionality was so simple that I didn't really need it, so I just stripped the objects down into simple functions to run the fit and transform steps, which was really all I needed anyway.

That got me through the rest of the steps, so I could practice pickling a model and re-loading it, which seemed to work just fine.

I don't know if the scikit-learn folks have plans to extend these methods, or if everyone normally does these kinds of acrobatics to encode and impute on multiple columns - normally I would just use pandas for that, too.

Sam Zeitlin

San Francisco

Former research scientist, self-taught pythonista. Yes, I have a PhD in biochemistry, and I write ...