🧱 Project Overview

A clean, reproducible ELT pipeline that:

Project Structure

airflow/
 β”œβ”€β”€ dags/
 β”‚   └── netflix_pipeline_dag.py
 β”œβ”€β”€ data/
 β”‚   └── netflix_titles.csv
 β”œβ”€β”€ scripts/
 β”‚   β”œβ”€β”€ setup_airflow.sh
 β”‚   └── env.example
 β”œβ”€β”€ requirements.txt
 └── README.md

You can place this folder anywhere (e.g., ~/airflow_home). Make sure your Airflow dags_folder points to ./dags or add this path to Airflow config.

βš™οΈ Step-by-Step Setup Guide

Step 1: Environment Setup (Local)

python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
Terminal screenshot of venv setup
# Airflow requires constraints
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION=$(python -c 'import sys; print(".".join(map(str, sys.version_info[:2])))')
pip install "apache-airflow==${AIRFLOW_VERSION}" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# Providers
pip install apache-airflow-providers-postgres pandas kaggle psycopg2-binary python-dotenv
Terminal screenshot of airflow install

Option B β€” Docker for PostgreSQL only

docker run -d --name pg-netflix -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=airflow_db -p 5432:5432 postgres:16

πŸ”‘ Step 2: Kaggle API

  1. On Kaggle: Account β†’ Create New API Token
  2. Place it at ~/.kaggle/kaggle.json
  3. Restrict permissions:
chmod 600 ~/.kaggle/kaggle.json
Terminal screenshot of Kaggle API setup

πŸ—„οΈ Step 3: PostgreSQL Connection in Airflow

Create an Airflow connection named pg_netflix.

CLI alternative:

airflow connections add pg_netflix \
  --conn-uri 'postgresql+psycopg2://postgres:postgres@localhost:5432/airflow_db'
Terminal screenshot of Airflow connection add

πŸͺ„ Step 4: Initialize Airflow and User

export AIRFLOW_HOME=$(pwd)
airflow db init
airflow users create --username admin --password admin \
  --role Admin --email turboath@example.com
Terminal screenshot of airflow db init Terminal screenshot of airflow users create

🧠 Step 5: Enable and Run the Pipeline

Start the services (two terminals):

# Terminal 1
airflow webserver -p 8080
Terminal screenshot of airflow webserver
# Terminal 2
airflow scheduler
Terminal screenshot of airflow scheduler

Open: http://localhost:8080

Airflow login screen Airflow DAGs list

πŸ§ͺ Step 6: Validate Data

SELECT COUNT(*) FROM netflix;
SELECT COUNT(*) FROM netflix_clean;

SELECT country, COUNT(*) AS c
FROM netflix_clean
GROUP BY country
ORDER BY c DESC
LIMIT 10;

πŸ“¦ Pipeline Code & Dependencies

from __future__ import annotations
import os
import io
import logging
from datetime import datetime, timedelta
import pandas as pd
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from airflow.providers.postgres.hooks.postgres import PostgresHook

# ---------- Config ----------
DATA_DIR = os.getenv("DATA_DIR", "/tmp/netflix_data")
KAGGLE_DATASET = "shivamb/netflix-shows"
RAW_CSV = os.path.join(DATA_DIR, "netflix_titles.csv")
PG_CONN_ID = os.getenv("PG_CONN_ID", "pg_netflix")

@dag(
    dag_id="netflix_elt_pipeline",
    default_args={"owner": "atharv", "retries": 1},
    start_date=datetime(2025, 11, 7),
    schedule_interval="@daily",
    catchup=False,
)
def netflix_elt_pipeline():
    log = logging.getLogger("airflow")

    @task
    def extract() -> str:
        # ... (full extract logic) ...
        log.info("Extract success.")
        return RAW_CSV

    @task
    def load(csv_path: str) -> int:
        # ... (full load logic) ...
        log.info("Load completed (upserted).")
        return len(df)

    @task
    def transform() -> int:
        # ... (full transform logic) ...
        logging.getLogger("airflow").info("Transform produced %d rows.", cnt)
        return int(cnt)

    # Orchestration
    csv_path = extract()
    _ = load(csv_path)
    _ = transform()

dag = netflix_elt_pipeline()
apache-airflow
apache-airflow-providers-postgres
pandas
psycopg2-binary
kaggle
python-dotenv

🧰 Helper Scripts

#!/usr/bin/env bash
set -euo pipefail

AIRFLOW_VERSION="${AIRFLOW_VERSION:-2.9.3}"
PYVER="$(python -c 'import sys; ...')"

echo "[*] Upgrading pip..."
python -m pip install --upgrade pip

echo "[*] Installing Airflow..."
# ... (rest of install) ...

echo "[*] Initializing Airflow DB..."
export AIRFLOW_HOME="${AIRFLOW_HOME:-$(pwd)}"
airflow db init

echo "[*] Creating admin user..."
airflow users create \
  --username admin --password admin \
  --role Admin --email admin@example.com

echo "[*] Creating Postgres connection 'pg_netflix'..."
airflow connections add pg_netflix \
  --conn-uri 'postgresql+psycopg2://postgres:postgres@localhost:5432/airflow_db'

echo "[*] Done."
# Copy to .env and edit values as needed
PG_HOST=localhost
PG_PORT=5432
PG_DB=airflow_db
PG_USER=postgres
PG_PASSWORD=postgres

DATA_DIR=/tmp/netflix_data

🧾 License

Released under the MIT License Β© 2025 TurboAth.

Copyright (c) 2025 TurboAth

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software...