Chapter 6: ETL Pipelines
Learning Objectives
After completing this chapter, you will be able to:
- Explain the ETL pattern and why it matters for data-driven organizations
- Extract data from CSV files, databases, and APIs using Python
- Transform data using pandas for cleaning, validation, and standardization
- Load data into a database using SQLAlchemy's
create_engineandto_sql - Implement error handling with try-except blocks in data pipelines
- Use Python's logging module to track pipeline execution
- Design a complete ETL pipeline script with a modular architecture
- Apply retry logic and idempotent operations for production reliability
6.1 What is ETL?
ETL stands for Extract, Transform, Load -- the three phases of moving data from source systems into a target database or data warehouse.
1 2 3 4 5 6 7 | |
| Phase | Purpose | Tools |
|---|---|---|
| Extract | Read raw data from sources | pd.read_csv, requests, pd.read_sql |
| Transform | Clean, validate, and reshape data | pandas (dropna, fillna, merge, apply) |
| Load | Write processed data to a target | SQLAlchemy, to_sql |
Business data rarely lives in a single, clean source. ETL pipelines bring multiple sources together into one reliable dataset for analysis.
ETL vs ELT
In modern cloud warehouses (Snowflake, BigQuery), you may encounter ELT -- load raw data first, then transform inside the warehouse with SQL. We focus on ETL because it gives you full control over transformation in Python.
6.2 Extract Phase
The extract phase reads data from source systems without modifying the source.
CSV Data Sources
1 2 3 4 5 6 7 8 | |
Leading Zeros
Zip codes like 07101 lose their leading zeros if pandas reads them as integers. Always pass dtype={"column": str} for columns that should remain text.
Database Sources
1 2 3 4 5 6 7 | |
API Data Sources
1 2 3 4 5 6 7 8 9 | |
| Source | Pros | Cons | Best For |
|---|---|---|---|
| CSV | Simple, portable | No schema enforcement | One-time imports, data sharing |
| Database | Structured, typed, SQL filtering | Requires credentials | Recurring extracts |
| API | Real-time, vendor-maintained | Rate limits, pagination | Third-party services |
6.3 Transform Phase
Data Cleaning
1 2 3 4 5 6 7 8 9 10 | |
Data Standardization
1 2 3 | |
Why Standardization Matters
Joining customer records where one system stores "IL" and another "Illinois" will miss matches. Always standardize before loading.
Data Validation
1 2 3 4 5 6 7 | |
A Complete Transform Function
1 2 3 4 5 6 7 8 9 10 | |
6.4 Load Phase
SQLAlchemy and create_engine
SQLAlchemy provides a uniform interface to many databases. The engine manages connections.
1 2 3 4 5 6 7 | |
Connection string pattern: dialect+driver://username:password@host:port/database
DataFrame to SQL
1 | |
if_exists |
Behavior | Use Case |
|---|---|---|
"fail" |
Raise an error (default) | First-time creation |
"replace" |
Drop and recreate the table | Full refresh pipelines |
"append" |
Add rows to existing table | Incremental pipelines |
replace Drops the Table
if_exists="replace" deletes all existing data before writing. Fine for development, but use "append" with deduplication for production incremental loads.
SQL to DataFrame
1 2 3 4 5 6 | |
6.5 Error Handling with try-except
ETL pipelines interact with files, networks, and databases -- all of which can fail.
1 2 3 4 5 6 7 8 | |
Never Use Bare except
A bare except: catches everything, including keyboard interrupts. Always catch specific exceptions.
1 2 3 4 5 | |
Use else for success-only code and finally for cleanup:
1 2 3 4 5 6 7 8 9 | |
6.6 Logging
Print statements are fine for debugging, but production pipelines need structured logging.
1 2 3 4 5 6 7 8 | |
| Level | When to Use |
|---|---|
DEBUG |
Detailed diagnostic info |
INFO |
Normal operations (phase start/end, row counts) |
WARNING |
Unexpected but non-fatal (e.g., "3 rows had missing emails") |
ERROR |
Something failed |
CRITICAL |
Pipeline cannot continue |
Log Counts at Every Phase
Always log row counts: "Extracted 1,500 rows", "After cleaning: 1,487 rows", "Loaded 1,487 rows". These counts are the fastest way to find where data was lost.
6.7 Pipeline Architecture
A well-structured pipeline separates each phase into its own function and orchestrates them from a single entry point.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | |
Sample Log Output
1 2 3 4 5 6 7 | |
6.8 Retry Logic and Idempotent Operations
Retry Logic
When network calls fail, they often work on the next attempt. A retry pattern with exponential backoff automates this.
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
Idempotent Operations
An operation is idempotent if running it multiple times produces the same result as running it once. If a pipeline fails halfway and you re-run it, you should not end up with duplicate data.
| Strategy | How It Works | Example |
|---|---|---|
| Replace | Delete all, then insert | to_sql(if_exists="replace") |
| Upsert | Insert new, update existing | INSERT ... ON DUPLICATE KEY UPDATE |
| Deduplicate | Append, then remove dupes | SQL DELETE after load |
Design for Re-Runs
When building any pipeline, ask: "What happens if I run this twice?" If the answer is "duplicate rows," redesign the load step to be idempotent.
Key Takeaways
- ETL is a three-phase pattern -- Extract from sources, Transform with cleaning and validation, Load into a target database
- pandas handles extraction and transformation --
read_csv,read_sql, and the pandas API cover most ETL needs - SQLAlchemy bridges Python and databases --
create_engineconnects,to_sqlwrites,read_sqlreads - Error handling prevents silent failures -- use try-except with specific exceptions, never bare
except - Logging tracks pipeline execution -- log row counts at every phase to diagnose issues quickly
- Modular architecture matters -- separate E, T, and L into functions; orchestrate from a single entry point
- Idempotency makes pipelines safe -- design loads so running twice gives the same result as running once
Review Questions
- What are the three phases of ETL, and what happens in each one?
- Why should you pass
dtype={"zip_code": str}when reading a CSV that contains zip codes? - Explain the difference between
if_exists="replace"andif_exists="append"into_sql. When would you use each? - Why is catching a specific exception (like
FileNotFoundError) better than using a bareexcept:? - What does it mean for a load operation to be idempotent, and why is this important for ETL pipelines?
Practical Exercise
Build a complete ETL pipeline that processes sales data.
Setup: Create sales_raw.csv:
1 2 3 4 5 6 7 | |
Tasks:
- Extract: Read the CSV into a DataFrame
- Transform (
transform_salesfunction): drop rows missingcustomer_email, remove duplicateorder_ids, lowercase emails, title-case regions, convertorder_dateto datetime (coerce errors to NaT then drop), remove rows whereamount <= 0 - Load: Write cleaned data to a SQLite table
salesusing SQLAlchemy - Verify: Read the table back with
read_sqland print it - Log: Add logging at each phase with row counts
Expected result: 3 clean rows (1001, 1003 deduplicated to one, plus 1006) loaded into sales.
Next Steps
In Chapter 7, we'll explore NoSQL databases and how to work with APIs and document-oriented data stores like MongoDB for semi-structured data.
Corresponds to Week 5 of BADM 554 -- ETL Pipelines