from __future__ import annotations

import json
import urllib.parse
import urllib.request

import pendulum
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG, Variable

RAPIDAPI_HOST = "sec-event-intelligence.p.rapidapi.com"
API_BASE_URL = f"https://{RAPIDAPI_HOST}"


def fetch_sec_watchlist(**context):
    rapidapi_key = Variable.get("sec_event_intelligence_rapidapi_key")
    tickers = Variable.get("sec_event_intelligence_tickers", default="AAPL,MSFT,NVDA")
    since = Variable.get("sec_event_intelligence_since", default=context["ds"])
    query = urllib.parse.urlencode({
        "tickers": tickers,
        "since": since,
        "limit": "25",
    })
    request = urllib.request.Request(
        f"{API_BASE_URL}/v1/sec/watchlist/changes?{query}",
        headers={
            "x-rapidapi-host": RAPIDAPI_HOST,
            "x-rapidapi-key": rapidapi_key,
            "accept": "application/json",
        },
    )

    with urllib.request.urlopen(request, timeout=30) as response:
        payload = json.loads(response.read().decode("utf-8"))

    rows = []
    for group in payload.get("data", []):
        for change in group.get("changes", []):
            filing = change.get("filing", {})
            rows.append({
                "ticker": group.get("ticker"),
                "form": filing.get("form", ""),
                "company_name": filing.get("companyName", ""),
                "filed_at": filing.get("filedAt", ""),
                "accession_no": filing.get("accessionNo", ""),
                "filing_url": filing.get("filingUrl", ""),
            })

    context["ti"].xcom_push(key="filing_rows", value=rows)
    context["ti"].xcom_push(key="filing_count", value=len(rows))
    return {"filing_count": len(rows), "rows": rows[:25]}


with DAG(
    dag_id="sec_event_intelligence_watchlist",
    description="Poll SEC Event Intelligence watchlist changes through RapidAPI.",
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 7, 1, tz="UTC"),
    catchup=False,
    tags=["sec", "filings", "rapidapi", "data-apis"],
) as dag:
    fetch_watchlist = PythonOperator(
        task_id="fetch_sec_watchlist",
        python_callable=fetch_sec_watchlist,
    )
