# End-to-End Data Pipeline Notebook

This notebook demonstrates a fully integrated data pipeline supporting both **batch** and **streaming** processing, including data ingestion, transformation, validation, storage, monitoring, governance, and ML tracking.

It is designed for rapid deployment and customization using Docker Compose, Apache Airflow, Apache Spark, Kafka, MinIO, PostgreSQL, Prometheus, Grafana, and more.


## Architecture Overview

Below is a text-based diagram of the pipeline architecture:

```
                              ┌─────────────────────────────┐
                              │         Batch Source        │
                              │   (MySQL, Files, etc.)      │
                              └────────────┬────────────────┘
                                           │
                                           │ (Extract/Validate)
                                           ▼
                           ┌────────────────────────────────┐
                           │      Airflow Batch DAG         │
                           │ - Extracts data from MySQL     │
                           │ - Validates with GreatExpect.  │
                           │ - Uploads raw data to MinIO    │
                           └────────────┬───────────────────┘
                                        │ (spark-submit)
                                        ▼
                           ┌───────────────────────────────────┐
                           │         Spark Batch Job           │
                           │ - Transforms, cleans, enriches    │
                           │ - Writes to PostgreSQL & MinIO    │
                           └────────────┬──────────────────────┘
                                        │
                                        ▼
                           ┌────────────────────────────────┐
                           │       Processed Data Store     │
                           │         (PostgreSQL)           │
                           └────────────────────────────────┘
                                        
Streaming Side:
                              ┌─────────────────────────────┐
                              │       Streaming Source      │
                              │         (Kafka)             │
                              └────────────┬────────────────┘
                                           │
                                           ▼
                           ┌───────────────────────────────────┐
                           │    Spark Streaming Job            │
                           │ - Consumes Kafka messages         │
                           │ - Detects anomalies               │
                           │ - Writes to PostgreSQL & MinIO    │
                           └───────────────────────────────────┘

Monitoring & Governance:
                              ┌────────────────────────────────┐
                              │   Monitoring & Governance      │
                              │ - Prometheus & Grafana         │
                              │ - Apache Atlas/OpenMetadata    │
                              └────────────────────────────────┘

ML & Serving:
                              ┌────────────────────────────────┐
                              │        AI/ML Serving           │
                              │ - Feature Store (Feast)        │
                              │ - MLflow Model Tracking        │
                              │ - BI Dashboards                │
                              └────────────────────────────────┘
```

## Directory Structure

```
end-to-end-pipeline/
  ├── docker-compose.yaml            # Docker Compose for all services
  ├── README.md                      # This documentation
  ├── airflow/
  │   ├── Dockerfile                 # Airflow image
  │   ├── requirements.txt           # Python deps for Airflow
  │   └── dags/
  │       ├── batch_ingestion_dag.py # Batch ingestion DAG
  │       └── streaming_monitoring_dag.py  # Streaming monitoring DAG
  ├── spark/
  │   ├── Dockerfile                 # Spark image
  │   ├── spark_batch_job.py         # Spark batch ETL job
  │   └── spark_streaming_job.py     # Spark streaming job
  ├── kafka/
  │   └── producer.py                # Kafka producer simulation
  ├── great_expectations/
  │   ├── great_expectations.yaml    # GE config
  │   └── expectations/
  │       └── raw_data_validation.py # GE suite
  ├── governance/
  │   └── atlas_stub.py              # Dataset lineage registration
  ├── monitoring/
  │   ├── monitoring.py              # Monitoring setup for Prometheus & Grafana
  │   └── prometheus.yml             # Prometheus config
  ├── ml/
  │   ├── feature_store_stub.py      # Feature Store stub
  │   └── mlflow_tracking.py         # MLflow tracking
  └── scripts/
      └── init_db.sql                # MySQL initialization script
```

In [None]:
# Docker Compose content
docker_compose_yaml = '''
version: "3.8"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

  mysql:
    image: mysql:8.0
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: source_db
      MYSQL_USER: user
      MYSQL_PASSWORD: pass
    ports:
      - "3306:3306"
    volumes:
      - ./scripts/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql

  postgres:
    image: postgres:14
    container_name: postgres
    environment:
      POSTGRES_DB: processed_db
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
    ports:
      - "5432:5432"

  minio:
    image: minio/minio:latest
    container_name: minio
    environment:
      MINIO_ROOT_USER: minio
      MINIO_ROOT_PASSWORD: minio123
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data

  airflow-webserver:
    build: ./airflow
    container_name: airflow-webserver
    ports:
      - "8080:8080"
    depends_on:
      - postgres
      - kafka
      - mysql
      - minio
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    volumes:
      - ./airflow/dags:/opt/airflow/dags
      - ./airflow/logs:/opt/airflow/logs
      - ./airflow/plugins:/opt/airflow/plugins
      - ./great_expectations:/opt/airflow/great_expectations

  spark:
    build: ./spark
    container_name: spark
    depends_on:
      - kafka
      - mysql
      - minio
      - postgres
    volumes:
      - ./spark:/opt/spark_jobs
      - ./great_expectations:/opt/spark_jobs/great_expectations

volumes:
  minio_data:
'''

print(docker_compose_yaml)

In [None]:
# Airflow Batch Ingestion DAG (airflow/dags/batch_ingestion_dag.py)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import pandas as pd
import boto3
import logging
import great_expectations as ge

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

def extract_data_from_mysql(**kwargs):
    logging.info("Extracting data from MySQL...")
    mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
    df = mysql_hook.get_pandas_df("SELECT * FROM orders;")
    df.to_csv("/tmp/orders.csv", index=False)
    logging.info(f"Extracted {len(df)} records.")

def validate_data_with_ge(**kwargs):
    logging.info("Validating data with Great Expectations...")
    df = pd.read_csv("/tmp/orders.csv")
    ge_df = ge.from_pandas(df)
    result = ge_df.expect_column_values_to_not_be_null("order_id")
    if not result["success"]:
        raise ValueError("Validation failed: 'order_id' contains null values")
    logging.info("Data validation passed.")

def load_to_minio(**kwargs):
    logging.info("Uploading raw data to MinIO...")
    s3 = boto3.client(
        's3',
        endpoint_url='http://minio:9000',
        aws_access_key_id='minio',
        aws_secret_access_key='minio123',
        region_name='us-east-1'
    )
    bucket_name = "raw-data"
    try:
        s3.create_bucket(Bucket=bucket_name)
    except Exception as e:
        logging.info(f"Bucket may already exist: {e}")
    s3.upload_file("/tmp/orders.csv", bucket_name, "orders/orders.csv")
    logging.info("File uploaded to MinIO.")

def load_to_postgres(**kwargs):
    logging.info("Loading transformed data to Postgres...")
    pg_hook = PostgresHook(postgres_conn_id="postgres_default")
    df = pd.read_csv("/tmp/transformed_orders.csv")
    pg_hook.run("TRUNCATE TABLE orders_transformed;")
    for _, row in df.iterrows():
        insert_sql = """
        INSERT INTO orders_transformed(order_id, customer_id, amount, processed_timestamp)
        VALUES (%s, %s, %s, %s)
        """
        pg_hook.run(insert_sql, parameters=(row['order_id'], row['customer_id'], row['amount'], row['processed_timestamp']))
    logging.info("Data loaded into Postgres.")

with DAG(
    dag_id='batch_ingestion_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:
    extract_task = PythonOperator(
        task_id='extract_mysql',
        python_callable=extract_data_from_mysql
    )

    validate_task = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data_with_ge
    )

    load_to_minio_task = PythonOperator(
        task_id='load_to_minio',
        python_callable=load_to_minio
    )

    spark_transform_task = BashOperator(
        task_id='spark_transform',
        bash_command='spark-submit --master local[2] /opt/spark_jobs/spark_batch_job.py'
    )

    load_postgres_task = PythonOperator(
        task_id='load_to_postgres',
        python_callable=load_to_postgres
    )

    extract_task >> validate_task >> load_to_minio_task >> spark_transform_task >> load_postgres_task


In [None]:
# Spark Batch Job (spark/spark_batch_job.py)
import os
import logging
import glob
import shutil
import sys
import great_expectations as ge
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, sum as spark_sum, count

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = "minio"
MINIO_SECRET_KEY = "minio123"
RAW_DATA_PATH = "s3a://raw-data/orders/orders.csv"
PROCESSED_DATA_PATH = "s3a://processed-data/orders_transformed.csv"
LOCAL_OUTPUT_DIR = "/tmp/transformed_orders"
LOCAL_OUTPUT_FILE = "/tmp/transformed_orders.csv"

def validate_schema(df):
    logging.info("Validating schema using Great Expectations...")
    ge_df = ge.from_pandas(df.toPandas())
    result_order_id = ge_df.expect_column_values_to_not_be_null("order_id")
    if not result_order_id.success:
        raise ValueError("Validation failed: 'order_id' contains null values")
    result_customer = ge_df.expect_column_values_to_be_between("customer_id", min_value=1)
    if not result_customer.success:
        raise ValueError("Validation failed: 'customer_id' is not positive")
    result_amount = ge_df.expect_column_values_to_be_between("amount", min_value=0.01)
    if not result_amount.success:
        raise ValueError("Validation failed: 'amount' contains zero or negative values")
    logging.info("Schema validation passed.")

def main():
    try:
        spark = SparkSession.builder \
            .appName("BatchETL") \
            .getOrCreate()

        spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", MINIO_ENDPOINT)
        spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", MINIO_ACCESS_KEY)
        spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", MINIO_SECRET_KEY)
        spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

        logging.info(f"Reading raw data from MinIO ({RAW_DATA_PATH})...")
        df = spark.read.option("header", "true").csv(RAW_DATA_PATH)

        if df.rdd.isEmpty():
            raise Exception("No data found in the input file.")

        df = df.withColumn("order_id", col("order_id").cast("int")) \
               .withColumn("customer_id", col("customer_id").cast("int")) \
               .withColumn("amount", col("amount").cast("double"))

        validate_schema(df)

        initial_record_count = df.count()
        logging.info(f"Initial record count: {initial_record_count}")

        df = df.dropDuplicates(["order_id"])
        df = df.fillna({"amount": 0.0})
        df_transformed = df.withColumn("processed_timestamp", current_timestamp())

        df_aggregated = df_transformed.groupBy("customer_id").agg(
            spark_sum("amount").alias("total_spent"),
            count("*").alias("order_count")
        )

        final_record_count = df_transformed.count()
        logging.info(f"Final record count: {final_record_count}")

        logging.info(f"Writing transformed data to MinIO ({PROCESSED_DATA_PATH})...")
        df_transformed.write.mode("overwrite").option("header", "true").csv(PROCESSED_DATA_PATH)

        logging.info(f"Writing transformed data to local directory ({LOCAL_OUTPUT_DIR})...")
        df_transformed.coalesce(1).write.mode("overwrite").option("header", "true").csv(LOCAL_OUTPUT_DIR)

        csv_files = glob.glob(f"{LOCAL_OUTPUT_DIR}/part-*.csv")
        if csv_files:
            shutil.move(csv_files[0], LOCAL_OUTPUT_FILE)
            logging.info(f"Transformed file successfully written to {LOCAL_OUTPUT_FILE}")
        else:
            logging.warning("No CSV files found after transformation.")

        spark.stop()
        logging.info("Batch ETL process completed successfully.")

    except Exception as e:
        logging.error(f"Batch processing failed: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()


In [None]:
# Spark Streaming Job (spark/spark_streaming_job.py)
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
import logging
import great_expectations as ge

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

KAFKA_BROKER = "kafka:9092"
KAFKA_TOPIC = "sensor_readings"
MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = "minio"
MINIO_SECRET_KEY = "minio123"
RAW_DATA_PATH = "s3a://raw-data/streaming_raw/"
ANOMALY_DATA_PATH = "s3a://processed-data/streaming_anomalies/"

POSTGRES_TABLE = "anomalies_stream"

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("timestamp", LongType(), False),
    StructField("device_id", LongType(), False),
    StructField("reading_value", DoubleType(), False)
])

def validate_schema(df):
    logging.info("Validating streaming data schema with Great Expectations...")
    ge_df = ge.from_pandas(df.toPandas())
    result_event_id = ge_df.expect_column_values_to_not_be_null("event_id")
    if not result_event_id.success:
        raise ValueError("Validation failed: 'event_id' contains null values")
    result_timestamp = ge_df.expect_column_values_to_be_between("timestamp", min_value=1)
    if not result_timestamp.success:
        raise ValueError("Validation failed: 'timestamp' contains invalid values")
    result_device = ge_df.expect_column_values_to_be_between("device_id", min_value=1)
    if not result_device.success:
        raise ValueError("Validation failed: 'device_id' contains invalid values")
    result_reading = ge_df.expect_column_values_to_be_between("reading_value", min_value=0, max_value=100)
    if not result_reading.success:
        raise ValueError("Validation failed: 'reading_value' is out of expected range")
    logging.info("Streaming schema validation passed.")

def save_to_postgres(batch_df, batch_id):
    logging.info(f"Writing batch {batch_id} to PostgreSQL table {POSTGRES_TABLE}...")
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/processed_db") \
        .option("dbtable", POSTGRES_TABLE) \
        .option("user", "user") \
        .option("password", "pass") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()
    logging.info(f"Batch {batch_id} written to PostgreSQL.")

def main():
    spark = SparkSession.builder \
        .appName("StreamingETL") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
        .getOrCreate()

    spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", MINIO_ENDPOINT)
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", MINIO_ACCESS_KEY)
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", MINIO_SECRET_KEY)
    spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

    logging.info(f"Starting Spark Structured Streaming from Kafka topic: {KAFKA_TOPIC}")
    df_raw = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BROKER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

    df_parsed = df_raw.select(
        from_json(col("value").cast("string"), schema).alias("json_data")
    ).select("json_data.*")

    df_clean = df_parsed.filter(
        col("event_id").isNotNull() & 
        col("timestamp").isNotNull() & 
        col("device_id").isNotNull() & 
        col("reading_value").isNotNull()
    )

    validate_schema(df_clean)

    df_anomalies = df_clean.filter(col("reading_value") > 70.0)

    # Write raw data to MinIO
    df_clean.writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/spark-checkpoints/raw") \
        .option("path", RAW_DATA_PATH) \
        .outputMode("append") \
        .start()

    # Write anomalies to MinIO
    df_anomalies.writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/spark-checkpoints/anomalies") \
        .option("path", ANOMALY_DATA_PATH) \
        .outputMode("append") \
        .start()

    # Write anomalies to PostgreSQL using foreachBatch
    df_anomalies.writeStream \
        .foreachBatch(lambda batch_df, batch_id: save_to_postgres(batch_df, batch_id)) \
        .outputMode("append") \
        .start()

    logging.info("Streaming job started. Processing events in real time.")
    spark.streams.awaitAnyTermination()

if __name__ == "__main__":
    main()


In [None]:
# Kafka Producer (kafka/producer.py)
import json
import time
import random
import uuid
import logging
from kafka import KafkaProducer
import os

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "sensor_readings")
MESSAGE_FREQUENCY = float(os.getenv("MESSAGE_FREQUENCY", 1))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10))
ACKS_MODE = os.getenv("ACKS_MODE", "all")

producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BROKER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks=ACKS_MODE
)

def create_kafka_topic(topic_name):
    from kafka.admin import KafkaAdminClient, NewTopic
    from kafka.errors import KafkaError
    try:
        admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BROKER)
        existing_topics = admin_client.list_topics()
        if topic_name not in existing_topics:
            topic = NewTopic(name=topic_name, num_partitions=3, replication_factor=1)
            admin_client.create_topics([topic])
            logging.info(f"Kafka topic '{topic_name}' created successfully.")
        else:
            logging.info(f"Kafka topic '{topic_name}' already exists.")
    except KafkaError as e:
        logging.error(f"Failed to create Kafka topic '{topic_name}': {e}")

def generate_event():
    return {
        "event_id": str(uuid.uuid4()),
        "timestamp": int(time.time()),
        "device_id": random.randint(1000, 9999),
        "reading_value": round(random.uniform(20.0, 80.0), 2)
    }

def produce_messages():
    logging.info(f"Starting Kafka Producer. Sending messages to topic: {KAFKA_TOPIC}")
    create_kafka_topic(KAFKA_TOPIC)
    batch = []
    message_count = 0
    try:
        while True:
            event = generate_event()
            batch.append(event)
            if len(batch) >= BATCH_SIZE:
                for msg in batch:
                    producer.send(KAFKA_TOPIC, msg)
                producer.flush()
                logging.info(f"Sent batch of {len(batch)} messages to Kafka topic: {KAFKA_TOPIC}")
                message_count += len(batch)
                batch.clear()
            time.sleep(MESSAGE_FREQUENCY)
    except KeyboardInterrupt:
        logging.warning("Kafka Producer stopped manually.")
    except Exception as e:
        logging.error(f"Kafka producer error: {e}")
    finally:
        logging.info(f"Kafka Producer shutting down. Total messages sent: {message_count}")
        producer.close()

if __name__ == "__main__":
    produce_messages()


In [None]:
# Governance / Data Lineage Registration (governance/atlas_stub.py)
import json
import logging
import os
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

ATLAS_API_URL = os.getenv("ATLAS_API_URL", "http://atlas:21000/api/atlas/v2/lineage")
ATLAS_USERNAME = os.getenv("ATLAS_USERNAME", "admin")
ATLAS_PASSWORD = os.getenv("ATLAS_PASSWORD", "admin")

HEADERS = { "Content-Type": "application/json" }

def check_dataset_exists(dataset_name):
    dataset_api_url = f"{ATLAS_API_URL}/entities?type=Dataset&name={dataset_name}"
    try:
        response = requests.get(dataset_api_url, auth=(ATLAS_USERNAME, ATLAS_PASSWORD), headers=HEADERS)
        if response.status_code == 200:
            data = response.json()
            if "entities" in data and len(data["entities"]) > 0:
                logging.info(f"Dataset '{dataset_name}' exists in Apache Atlas.")
                return True
            else:
                logging.warning(f"Dataset '{dataset_name}' does not exist in Apache Atlas.")
                return False
        else:
            logging.error(f"Failed to check dataset existence: {response.status_code} - {response.text}")
            return False
    except requests.RequestException as e:
        logging.error(f"Error while checking dataset existence: {str(e)}")
        return False

def register_dataset_lineage(source_name, target_name, extra_info=None):
    if not check_dataset_exists(source_name) or not check_dataset_exists(target_name):
        logging.error("Cannot register lineage: One or both datasets do not exist.")
        return
    lineage_payload = {
        "guidEntityMap": {},
        "relations": [
            {
                "typeName": "Process",
                "fromEntityId": source_name,
                "toEntityId": target_name,
                "relationshipAttributes": extra_info or {}
            }
        ]
    }
    try:
        response = requests.post(
            f"{ATLAS_API_URL}/entities",
            auth=(ATLAS_USERNAME, ATLAS_PASSWORD),
            headers=HEADERS,
            data=json.dumps(lineage_payload)
        )
        if response.status_code in [200, 201]:
            logging.info(f"Successfully registered lineage from '{source_name}' to '{target_name}'")
        else:
            logging.error(f"Failed to register lineage: {response.status_code} - {response.text}")
    except requests.RequestException as e:
        logging.error(f"Error while registering dataset lineage: {str(e)}")

if __name__ == "__main__":
    register_dataset_lineage(
        "mysql.orders",
        "minio.raw-data.orders",
        {"job": "batch_ingestion_dag", "transformation": "cleaning, enrichment"}
    )


In [None]:
# Monitoring Setup for Prometheus and Grafana (monitoring/monitoring.py)
import os
import json
import subprocess
import time
import logging
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

PROMETHEUS_CONFIG_PATH = os.getenv("PROMETHEUS_CONFIG_PATH", "/etc/prometheus/prometheus.yml")
PROMETHEUS_PORT = os.getenv("PROMETHEUS_PORT", "9090")
GRAFANA_PORT = os.getenv("GRAFANA_PORT", "3000")
GRAFANA_API_URL = f"http://localhost:{GRAFANA_PORT}/api"
GRAFANA_ADMIN_USER = os.getenv("GRAFANA_ADMIN_USER", "admin")
GRAFANA_ADMIN_PASS = os.getenv("GRAFANA_ADMIN_PASS", "admin")
DASHBOARDS_PATH = os.getenv("DASHBOARDS_PATH", "./monitoring/grafana_dashboards")

def start_prometheus():
    if not os.path.exists(PROMETHEUS_CONFIG_PATH):
        logging.error(f"Prometheus config file not found: {PROMETHEUS_CONFIG_PATH}")
        return
    logging.info("Starting Prometheus...")
    try:
        subprocess.Popen(["prometheus", "--config.file", PROMETHEUS_CONFIG_PATH])
        logging.info(f"Prometheus started on port {PROMETHEUS_PORT}")
    except Exception as e:
        logging.error(f"Failed to start Prometheus: {e}")

def start_grafana():
    logging.info("Starting Grafana...")
    try:
        subprocess.Popen(["grafana-server"])
        logging.info(f"Grafana started on port {GRAFANA_PORT}")
    except Exception as e:
        logging.error(f"Failed to start Grafana: {e}")

def wait_for_grafana():
    logging.info("Waiting for Grafana to be ready...")
    for _ in range(30):
        try:
            response = requests.get(f"{GRAFANA_API_URL}/health")
            if response.status_code == 200:
                logging.info("Grafana is ready.")
                return True
        except requests.ConnectionError:
            pass
        time.sleep(1)
    logging.error("Grafana did not start in time.")
    return False

def create_grafana_datasource():
    logging.info("Creating Grafana Prometheus datasource...")
    datasource_payload = {
        "name": "Prometheus",
        "type": "prometheus",
        "url": f"http://localhost:{PROMETHEUS_PORT}",
        "access": "proxy",
        "basicAuth": False
    }
    response = requests.post(
        f"{GRAFANA_API_URL}/datasources",
        auth=(GRAFANA_ADMIN_USER, GRAFANA_ADMIN_PASS),
        headers={"Content-Type": "application/json"},
        data=json.dumps(datasource_payload)
    )
    if response.status_code in [200, 201]:
        logging.info("Grafana Prometheus datasource created successfully.")
    else:
        logging.error(f"Failed to create Grafana datasource: {response.text}")

def import_grafana_dashboards():
    if not os.path.exists(DASHBOARDS_PATH):
        logging.warning(f"Dashboard directory not found: {DASHBOARDS_PATH}")
        return
    for dashboard_file in os.listdir(DASHBOARDS_PATH):
        if dashboard_file.endswith(".json"):
            dashboard_path = os.path.join(DASHBOARDS_PATH, dashboard_file)
            with open(dashboard_path, "r") as f:
                dashboard_data = json.load(f)
                dashboard_payload = {
                    "dashboard": dashboard_data,
                    "overwrite": True
                }
                response = requests.post(
                    f"{GRAFANA_API_URL}/dashboards/db",
                    auth=(GRAFANA_ADMIN_USER, GRAFANA_ADMIN_PASS),
                    headers={"Content-Type": "application/json"},
                    data=json.dumps(dashboard_payload)
                )
                if response.status_code in [200, 201]:
                    logging.info(f"Imported Grafana dashboard: {dashboard_file}")
                else:
                    logging.error(f"Failed to import {dashboard_file}: {response.text}")

def main():
    start_prometheus()
    start_grafana()
    if wait_for_grafana():
        create_grafana_datasource()
        import_grafana_dashboards()
    logging.info("Monitoring setup complete.")

if __name__ == "__main__":
    main()


In [None]:
# MLflow Tracking (ml/mlflow_tracking.py)
import mlflow
import random

def train_and_log_model():
    mlflow.set_experiment("my_experiment")
    with mlflow.start_run():
        param_value = random.randint(1, 100)
        mlflow.log_param("param1", param_value)
        accuracy = round(random.uniform(0.8, 0.99), 4)
        mlflow.log_metric("accuracy", accuracy)
        print(f"Tracked model run with param1={param_value}, accuracy={accuracy}")

if __name__ == "__main__":
    train_and_log_model()


## Setup Instructions

1. **Clone the Repository**
   ```bash
   git clone https://github.com/your-repo/end-to-end-pipeline.git
   cd end-to-end-pipeline
   ```

2. **Start the Pipeline Stack**
   Use Docker Compose to launch all services:
   ```bash
   docker-compose up --build
   ```
   This starts MySQL, PostgreSQL, Kafka, MinIO, Airflow, Spark, Prometheus, and Grafana.

3. **Access the Services**
   - **Airflow UI:** [http://localhost:8080](http://localhost:8080) (set up connections as needed)
   - **MinIO Console:** [http://localhost:9001](http://localhost:9001) (User: `minio`, Password: `minio123`)
   - **Kafka:** Port `9092`
   - **Prometheus:** [http://localhost:9090](http://localhost:9090)
   - **Grafana:** [http://localhost:3000](http://localhost:3000) (Default login: `admin/admin`)

4. **Run Batch Pipeline**
   - In the Airflow UI, enable and trigger the `batch_ingestion_dag`.

5. **Run Streaming Pipeline**
   - Start the Kafka producer:
     ```bash
     docker-compose exec kafka python /opt/spark_jobs/../kafka/producer.py
     ```
   - Run the Spark streaming job:
     ```bash
     docker-compose exec spark spark-submit --master local[2] /opt/spark_jobs/spark_streaming_job.py
     ```

6. **Monitoring & Governance**
   - Run the `monitoring.py` script to set up Prometheus and Grafana dashboards.
   - Run the `governance/atlas_stub.py` script to register dataset lineage.

7. **ML & Feature Store**
   - Run `ml/mlflow_tracking.py` to simulate model training and tracking.
   - Use `ml/feature_store_stub.py` (if implemented) for feature store integration.


## Example Applications

- **E-Commerce & Retail:**
  - Real-time recommendations and fraud detection using streaming data.
  - Batch processing of historical sales data for demand forecasting.

- **Financial Services:**
  - Risk analysis and trade surveillance using aggregated transaction data.

- **Healthcare:**
  - Patient monitoring using real-time sensor data and predictive analytics.

- **IoT & Manufacturing:**
  - Predictive maintenance by monitoring equipment sensor data in real time.

- **Media & Social Networks:**
  - Sentiment analysis and ad fraud detection using streaming social media data.


## Conclusion & Further Steps

This notebook has demonstrated an end-to-end data pipeline including batch and streaming processing, data quality checks, monitoring, governance, and ML tracking.

### Next Steps:

- Customize the transformation logic in Spark jobs.
- Adjust Airflow DAG schedules and dependencies to match your business needs.
- Scale the services in production using managed clusters or cloud services.
- Integrate real-world governance and ML systems by replacing stub implementations with production APIs.

Happy Coding!