π§± Project Overview
A clean, reproducible ELT pipeline that:
- Extracts the public Netflix Movies and TV Shows dataset from Kaggle
- Loads it into PostgreSQL with idempotent, fast inserts
- Transforms into a clean table for analysis
- Runs on a daily schedule with Airflow
- Ships with logs, scripts and a GitHubβready structure
Project Structure
airflow/
βββ dags/
β βββ netflix_pipeline_dag.py
βββ data/
β βββ netflix_titles.csv
βββ scripts/
β βββ setup_airflow.sh
β βββ env.example
βββ requirements.txt
βββ README.md
You can place this folder anywhere (e.g., ~/airflow_home). Make sure your Airflow dags_folder points to ./dags or add this path to Airflow config.
βοΈ Step-by-Step Setup Guide
Step 1: Environment Setup (Local)
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
# Airflow requires constraints
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION=$(python -c 'import sys; print(".".join(map(str, sys.version_info[:2])))')
pip install "apache-airflow==${AIRFLOW_VERSION}" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Providers
pip install apache-airflow-providers-postgres pandas kaggle psycopg2-binary python-dotenv
Option B β Docker for PostgreSQL only
docker run -d --name pg-netflix -e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=airflow_db -p 5432:5432 postgres:16
π Step 2: Kaggle API
- On Kaggle: Account β Create New API Token
- Place it at
~/.kaggle/kaggle.json - Restrict permissions:
chmod 600 ~/.kaggle/kaggle.json
ποΈ Step 3: PostgreSQL Connection in Airflow
Create an Airflow connection named pg_netflix.
CLI alternative:
airflow connections add pg_netflix \
--conn-uri 'postgresql+psycopg2://postgres:postgres@localhost:5432/airflow_db'
πͺ Step 4: Initialize Airflow and User
export AIRFLOW_HOME=$(pwd)
airflow db init
airflow users create --username admin --password admin \
--role Admin --email turboath@example.com
π§ Step 5: Enable and Run the Pipeline
Start the services (two terminals):
# Terminal 1
airflow webserver -p 8080
# Terminal 2
airflow scheduler
Open: http://localhost:8080
π§ͺ Step 6: Validate Data
SELECT COUNT(*) FROM netflix;
SELECT COUNT(*) FROM netflix_clean;
SELECT country, COUNT(*) AS c
FROM netflix_clean
GROUP BY country
ORDER BY c DESC
LIMIT 10;
π¦ Pipeline Code & Dependencies
from __future__ import annotations
import os
import io
import logging
from datetime import datetime, timedelta
import pandas as pd
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from airflow.providers.postgres.hooks.postgres import PostgresHook
# ---------- Config ----------
DATA_DIR = os.getenv("DATA_DIR", "/tmp/netflix_data")
KAGGLE_DATASET = "shivamb/netflix-shows"
RAW_CSV = os.path.join(DATA_DIR, "netflix_titles.csv")
PG_CONN_ID = os.getenv("PG_CONN_ID", "pg_netflix")
@dag(
dag_id="netflix_elt_pipeline",
default_args={"owner": "atharv", "retries": 1},
start_date=datetime(2025, 11, 7),
schedule_interval="@daily",
catchup=False,
)
def netflix_elt_pipeline():
log = logging.getLogger("airflow")
@task
def extract() -> str:
# ... (full extract logic) ...
log.info("Extract success.")
return RAW_CSV
@task
def load(csv_path: str) -> int:
# ... (full load logic) ...
log.info("Load completed (upserted).")
return len(df)
@task
def transform() -> int:
# ... (full transform logic) ...
logging.getLogger("airflow").info("Transform produced %d rows.", cnt)
return int(cnt)
# Orchestration
csv_path = extract()
_ = load(csv_path)
_ = transform()
dag = netflix_elt_pipeline()
apache-airflow
apache-airflow-providers-postgres
pandas
psycopg2-binary
kaggle
python-dotenv
π§° Helper Scripts
#!/usr/bin/env bash
set -euo pipefail
AIRFLOW_VERSION="${AIRFLOW_VERSION:-2.9.3}"
PYVER="$(python -c 'import sys; ...')"
echo "[*] Upgrading pip..."
python -m pip install --upgrade pip
echo "[*] Installing Airflow..."
# ... (rest of install) ...
echo "[*] Initializing Airflow DB..."
export AIRFLOW_HOME="${AIRFLOW_HOME:-$(pwd)}"
airflow db init
echo "[*] Creating admin user..."
airflow users create \
--username admin --password admin \
--role Admin --email admin@example.com
echo "[*] Creating Postgres connection 'pg_netflix'..."
airflow connections add pg_netflix \
--conn-uri 'postgresql+psycopg2://postgres:postgres@localhost:5432/airflow_db'
echo "[*] Done."
# Copy to .env and edit values as needed
PG_HOST=localhost
PG_PORT=5432
PG_DB=airflow_db
PG_USER=postgres
PG_PASSWORD=postgres
DATA_DIR=/tmp/netflix_data
π§Ύ License
Released under the MIT License Β© 2025 TurboAth.
Copyright (c) 2025 TurboAth
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software...