diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ad38a4a --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +# LLM (OpenAI-compatible gateway) +LLM_BASE_URL=http://129.204.192.37:23000/v1 +LLM_API_KEY=sk-REDACTED +LLM_MODEL=gpt-5.2 + +# DB +DATABASE_URL=sqlite:///./eventflow.sqlite3 + +# Ingest tuning +FETCH_UA=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123 Safari/537.36 +FETCH_RPS_PER_DOMAIN=0.2 +FETCH_TIMEOUT_SECS=15 +FETCH_MAX_RETRIES=3 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b21aa8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.env +*.sqlite +*.sqlite3 +__pycache__/ +*.pyc +.pytest_cache/ +*.log diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/fastapi_app/__init__.py b/backend/fastapi_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/fastapi_app/kb/__init__.py b/backend/fastapi_app/kb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/fastapi_app/kb/rules/__init__.py b/backend/fastapi_app/kb/rules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/fastapi_app/kb/rules/rules.yml b/backend/fastapi_app/kb/rules/rules.yml new file mode 100644 index 0000000..229c31d --- /dev/null +++ b/backend/fastapi_app/kb/rules/rules.yml @@ -0,0 +1 @@ +# V1 rules placeholder diff --git a/backend/fastapi_app/main.py b/backend/fastapi_app/main.py new file mode 100644 index 0000000..6ea73ad --- /dev/null +++ b/backend/fastapi_app/main.py @@ -0,0 +1,27 @@ +import threading + +from fastapi import FastAPI + +from .routers import analyze, ingest, query +from .services.retry_worker import run_retry_loop + +app = FastAPI(title="eventflow-fastapi", version="0.1.0") + +app.include_router(ingest.router, prefix="/ingest", tags=["ingest"]) +app.include_router(analyze.router, prefix="/analyze", tags=["analyze"]) +app.include_router(query.router, prefix="/query", tags=["query"]) + + +_stop = threading.Event() + + +@app.on_event("startup") +def _startup() -> None: + # Background retry loop so transient gateway 5xx doesn't permanently stall parsing. + t = threading.Thread(target=run_retry_loop, args=(_stop,), daemon=True) + t.start() + + +@app.on_event("shutdown") +def _shutdown() -> None: + _stop.set() diff --git a/backend/fastapi_app/routers/__init__.py b/backend/fastapi_app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/fastapi_app/routers/analyze.py b/backend/fastapi_app/routers/analyze.py new file mode 100644 index 0000000..cf590b8 --- /dev/null +++ b/backend/fastapi_app/routers/analyze.py @@ -0,0 +1,58 @@ +import json +from fastapi import APIRouter, HTTPException + +from ..services.llm_extract import extract_event +from ..services.store import ensure_schema, get_store, insert_event_result, insert_raw_item + +router = APIRouter() + + +@router.post("/event") +def analyze_event(payload: dict): + """Analyze one ad-hoc item and persist it. + + This powers the UI while we build full ingesters. + """ + + title = (payload.get("title") or "").strip() + content = (payload.get("content") or "").strip() + if not title and not content: + raise HTTPException(status_code=400, detail="missing title/content") + + st = get_store() + conn = st.connect() + ensure_schema(conn) + + raw_item_id = insert_raw_item( + conn, + source=str(payload.get("source") or "manual"), + item_date=str(payload.get("date") or "") or "manual", + title=title[:500], + content=content[:20_000], + url=payload.get("url"), + published_at=payload.get("published_at"), + lang=payload.get("lang"), + ) + + res = extract_event(title=title, content=content, lang_hint=payload.get("lang")) + + if res.get("ok") is True: + insert_event_result( + conn, + raw_item_id=raw_item_id, + model=str(res.get("model") or ""), + ok=True, + event_json=json.dumps(res.get("event"), ensure_ascii=True), + error=None, + ) + else: + insert_event_result( + conn, + raw_item_id=raw_item_id, + model=str(res.get("model") or ""), + ok=False, + event_json=None, + error=str(res.get("error") or "unknown"), + ) + + return {"raw_item_id": raw_item_id, **res} diff --git a/backend/fastapi_app/routers/ingest.py b/backend/fastapi_app/routers/ingest.py new file mode 100644 index 0000000..3530dad --- /dev/null +++ b/backend/fastapi_app/routers/ingest.py @@ -0,0 +1,214 @@ +import json +from datetime import date + +from fastapi import APIRouter, HTTPException + +from ..services.llm_extract import extract_event +from ..services.market_moves import fetch_moves_via_qfr +from ..services.store import ( + ensure_schema, + get_store, + insert_event_result, + insert_raw_item, +) + +router = APIRouter() + + +def _today() -> str: + return date.today().isoformat() + + +@router.post("/rss") +def ingest_rss(payload: dict): + """Ingest one or many RSS items. + + Expected payload: + {"items": [{"title":..., "url":..., "published_at":..., "summary":..., "lang":...}, ...]} + """ + items = payload.get("items") + if not isinstance(items, list) or not items: + raise HTTPException(status_code=400, detail="payload.items must be a non-empty list") + + st = get_store() + conn = st.connect() + ensure_schema(conn) + + n = 0 + for it in items: + if not isinstance(it, dict): + continue + insert_raw_item( + conn, + source="rss", + item_date=payload.get("date") or _today(), + title=(it.get("title") or "")[:500], + content=(it.get("summary") or it.get("content") or "")[:20_000], + url=it.get("url"), + published_at=it.get("published_at"), + lang=it.get("lang"), + ) + n += 1 + + return {"ok": True, "inserted": n} + + +@router.post("/macro") +def ingest_macro(payload: dict): + items = payload.get("items") + if not isinstance(items, list) or not items: + raise HTTPException(status_code=400, detail="payload.items must be a non-empty list") + + st = get_store() + conn = st.connect() + ensure_schema(conn) + + n = 0 + for it in items: + if not isinstance(it, dict): + continue + insert_raw_item( + conn, + source="macro", + item_date=payload.get("date") or _today(), + title=(it.get("title") or "")[:500], + content=(it.get("content") or "")[:20_000], + url=it.get("url"), + published_at=it.get("published_at"), + lang=it.get("lang"), + ) + n += 1 + + return {"ok": True, "inserted": n} + + +@router.post("/market_moves") +def ingest_market_moves(payload: dict): + items = payload.get("items") + if not isinstance(items, list) or not items: + raise HTTPException(status_code=400, detail="payload.items must be a non-empty list") + + st = get_store() + conn = st.connect() + ensure_schema(conn) + + n = 0 + for it in items: + if not isinstance(it, dict): + continue + insert_raw_item( + conn, + source="market_moves", + item_date=payload.get("date") or _today(), + title=(it.get("title") or "")[:500], + content=(it.get("content") or "")[:20_000], + url=it.get("url"), + published_at=it.get("published_at"), + lang=it.get("lang"), + ) + n += 1 + + return {"ok": True, "inserted": n} + + +@router.post("/market_moves/run") +def run_market_moves(payload: dict | None = None): + """Generate daily market-move items from QFR raw data and parse them into events.""" + + payload = payload or {} + day = str(payload.get("date") or _today()) + # QFR raw data uses trade_date like YYYYMMDD. + trade_date = day.replace("-", "") + + st = get_store() + conn = st.connect() + ensure_schema(conn) + + data = fetch_moves_via_qfr(trade_date=trade_date, symbols=payload.get("symbols")) + if not data.get("ok"): + raise HTTPException(status_code=500, detail=data) + + inserted = 0 + parsed_ok = 0 + parsed_err = 0 + + for mv in data.get("moves", []): + sym = mv.get("symbol") + td = mv.get("trade_date") + ret_1d = mv.get("ret_1d") + vol_20d = mv.get("vol_20d") + z_1d = mv.get("z_1d") + + title = f"Market move {sym} {td}: ret_1d={ret_1d:.4f} z_1d={z_1d:.2f}" if isinstance(ret_1d, (int, float)) and isinstance(z_1d, (int, float)) else f"Market move {sym} {td}" + content = ( + f"symbol={sym}\n" + f"trade_date={td}\n" + f"prev_trade_date={mv.get('prev_trade_date')}\n" + f"close={mv.get('close')} prev_close={mv.get('prev_close')}\n" + f"ret_1d={ret_1d} vol_20d={vol_20d} z_1d={z_1d}\n" + "Interpretation task: explain the most likely macro/industry drivers for this move and which assets could be affected." + ) + + raw_item_id = insert_raw_item( + conn, + source="market_moves", + item_date=day, + title=title[:500], + content=content[:20_000], + url=None, + published_at=None, + lang="en", + ) + inserted += 1 + + try: + res = extract_event(title=title, content=content, lang_hint="en") + except Exception as e: + # Network/provider errors should not abort the whole batch. + insert_event_result( + conn, + raw_item_id=raw_item_id, + model="", + ok=False, + event_json=None, + error=f"llm_exception:{type(e).__name__}", + ) + parsed_err += 1 + continue + + if res.get("ok") is True: + insert_event_result( + conn, + raw_item_id=raw_item_id, + model=str(res.get("model") or ""), + ok=True, + event_json=json.dumps(res.get("event"), ensure_ascii=True), + error=None, + ) + parsed_ok += 1 + else: + err = str(res.get("error") or "unknown") + if err == "llm_failed": + # Keep a short hint to debug gateway flakiness without dumping secrets. + exc = str(res.get("exc") or "") + if exc: + err = f"{err}:{exc}" + insert_event_result( + conn, + raw_item_id=raw_item_id, + model=str(res.get("model") or ""), + ok=False, + event_json=None, + error=err, + ) + parsed_err += 1 + + return { + "ok": True, + "date": day, + "inserted": inserted, + "parsed_ok": parsed_ok, + "parsed_err": parsed_err, + "errors": data.get("errors", []), + "symbols": data.get("symbols"), + } diff --git a/backend/fastapi_app/routers/query.py b/backend/fastapi_app/routers/query.py new file mode 100644 index 0000000..207e8b2 --- /dev/null +++ b/backend/fastapi_app/routers/query.py @@ -0,0 +1,215 @@ +from datetime import date + +from fastapi import APIRouter +from fastapi.responses import HTMLResponse + +from ..services.store import counts, get_store, list_events, list_raw_items, sources_today + +router = APIRouter() + + +@router.get("/health") +def health(): + return {"ok": True} + + +@router.get("/status") +def status(day: str | None = None): + st = get_store() + conn = st.connect() + item_date = day or date.today().isoformat() + return { + "date": item_date, + "counts": counts(conn), + "sources": sources_today(conn, item_date), + } + + +@router.get("/raw_items") +def raw_items(limit: int = 20): + st = get_store() + conn = st.connect() + return {"items": list_raw_items(conn, limit=limit)} + + +@router.get("/events") +def events(limit: int = 20): + st = get_store() + conn = st.connect() + return {"items": list_events(conn, limit=limit)} + + +@router.get("/ui", response_class=HTMLResponse) +def ui(): + # Tiny no-build UI for early validation. + html = """ + + + + + EventFlow V1 + + + +

EventFlow V1

+
Status page: sources, dates, counts, and parsed events.
+
+ +
+
+
+
+
Today:
+
Auto-refresh every 5s
+
+ +
+
+
+
+
Sources (today)
+
+
+ +
+
Quick Analyze (manual)
+
This will save a raw item + parsed event into SQLite.
+
+ + +
+ + +
+ +
+
+
+ +
+ +
+
+
Latest Raw Items (limit 20)
+
+
+
+
Latest Parsed Events (limit 20)
+
+
+
+ + + +""" + return HTMLResponse(content=html) diff --git a/backend/fastapi_app/services/__init__.py b/backend/fastapi_app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eventflow/backend/fastapi_app/services/llm_extract.py b/backend/fastapi_app/services/llm_extract.py similarity index 100% rename from eventflow/backend/fastapi_app/services/llm_extract.py rename to backend/fastapi_app/services/llm_extract.py diff --git a/backend/fastapi_app/services/market_moves.py b/backend/fastapi_app/services/market_moves.py new file mode 100644 index 0000000..b55651d --- /dev/null +++ b/backend/fastapi_app/services/market_moves.py @@ -0,0 +1,68 @@ +import json +import os +import subprocess +from typing import Any + + +WATCHLIST_DEFAULT = [ + # A-share ETF proxies for indices (QFR raw data uses ETF parquets) + # HS300 + "510300.SH", + # ZZ500 + "510500.SH", + # ChiNext + "159915.SZ", + # SSE50 proxy (may not exist in rawdir unless downloaded) + "510050.SH", + + # Futures placeholders (not in QFR rawdir by default; will show as errors until sourced) + "AU.SHF", + "CU.SHF", + "M.DCE", + "TA.CZCE", + "SC.INE", +] + + +def fetch_moves_via_qfr(*, trade_date: str | None = None, symbols: list[str] | None = None) -> dict[str, Any]: + """Fetch day-level moves by shelling out to the existing qfr conda env. + + Reason: eventflow env is kept minimal; qfr env already has pandas/pyarrow. + """ + + sym_list = symbols or WATCHLIST_DEFAULT + rawdir = os.environ.get("QFR_RAWDIR", "/home/openclaw/projects/quant-factor-research/data/raw") + + env = os.environ.copy() + env["QFR_RAWDIR"] = rawdir + env["QFR_SYMBOLS"] = ",".join(sym_list) + if trade_date: + env["QFR_TRADE_DATE"] = trade_date + + conda = os.environ.get("CONDA_BIN", "/home/openclaw/miniconda3/bin/conda") + script = os.path.join(os.path.dirname(__file__), "market_moves_qfr.py") + + cmd = [conda, "run", "-n", "qfr", "python", script] + proc = subprocess.run(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + if proc.returncode != 0: + return { + "ok": False, + "error": "qfr_subprocess_failed", + "returncode": proc.returncode, + "stderr": proc.stderr[-4000:], + } + + try: + data = json.loads(proc.stdout) + except json.JSONDecodeError: + return { + "ok": False, + "error": "invalid_json_from_qfr", + "stdout": proc.stdout[-2000:], + "stderr": proc.stderr[-2000:], + } + + data["ok"] = True + data["symbols"] = sym_list + return data diff --git a/backend/fastapi_app/services/market_moves_qfr.py b/backend/fastapi_app/services/market_moves_qfr.py new file mode 100644 index 0000000..1891fe1 --- /dev/null +++ b/backend/fastapi_app/services/market_moves_qfr.py @@ -0,0 +1,108 @@ +import json +import os +from dataclasses import asdict, dataclass +from typing import Any + +import pandas as pd + + +@dataclass +class Move: + symbol: str + trade_date: str + prev_trade_date: str + close: float + prev_close: float + ret_1d: float + vol_20d: float + z_1d: float + + +def _read_one(rawdir: str, symbol: str) -> pd.DataFrame: + # QFR stores parquet as e.g. 510300SH.parquet / 159915SZ.parquet + fn = symbol.replace(".", "") + ".parquet" + p = os.path.join(rawdir, fn) + df = pd.read_parquet(p) + # Standardize + if "trade_date" not in df.columns or "close" not in df.columns: + raise RuntimeError(f"unexpected parquet schema for {symbol}: {df.columns.tolist()}") + df = df.sort_values("trade_date").reset_index(drop=True) + return df + + +def _calc_move(df: pd.DataFrame, symbol: str, trade_date: str | None) -> Move | None: + if df.empty: + return None + # pick last available <= trade_date if given + if trade_date: + df2 = df[df["trade_date"] <= trade_date] + if df2.empty: + return None + df = df2 + + if len(df) < 2: + return None + + # compute returns + close = df["close"].astype(float) + ret = close.pct_change() + + i = len(df) - 1 + prev_i = i - 1 + + td = str(df.iloc[i]["trade_date"]) + ptd = str(df.iloc[prev_i]["trade_date"]) + + close_i = float(close.iloc[i]) + prev_close_i = float(close.iloc[prev_i]) + ret_1d = float(ret.iloc[i]) + + # vol over last 20 returns (excluding NaN) + vol_20 = float(ret.iloc[max(0, i - 20 + 1) : i + 1].std(skipna=True)) + if not (vol_20 > 0): + vol_20 = 0.0 + z = float(ret_1d / vol_20) if vol_20 > 0 else 0.0 + + return Move( + symbol=symbol, + trade_date=td, + prev_trade_date=ptd, + close=close_i, + prev_close=prev_close_i, + ret_1d=ret_1d, + vol_20d=vol_20, + z_1d=z, + ) + + +def main() -> None: + rawdir = os.environ.get("QFR_RAWDIR") or "/home/openclaw/projects/quant-factor-research/data/raw" + symbols = (os.environ.get("QFR_SYMBOLS") or "").strip() + trade_date = (os.environ.get("QFR_TRADE_DATE") or "").strip() or None + + if not symbols: + raise SystemExit("QFR_SYMBOLS is required") + + out: dict[str, Any] = { + "rawdir": rawdir, + "trade_date": trade_date, + "moves": [], + "errors": [], + } + + for sym in [s.strip() for s in symbols.split(",") if s.strip()]: + try: + df = _read_one(rawdir, sym) + mv = _calc_move(df, sym, trade_date) + if mv is None: + out["errors"].append({"symbol": sym, "error": "no_data"}) + continue + out["moves"].append(asdict(mv)) + except Exception as e: + out["errors"].append({"symbol": sym, "error": str(e)}) + + print(json.dumps(out, ensure_ascii=True)) + + +if __name__ == "__main__": + main() diff --git a/backend/fastapi_app/services/retry_worker.py b/backend/fastapi_app/services/retry_worker.py new file mode 100644 index 0000000..fdad3a3 --- /dev/null +++ b/backend/fastapi_app/services/retry_worker.py @@ -0,0 +1,99 @@ +import json +import os +import random +import threading +import time + +from .llm_extract import extract_event +from .store import ensure_schema, get_store, insert_event_result + + +def _pick_pending(conn, *, batch: int, max_attempts: int, min_age_s: int, retry_after_s: int): + now = int(time.time()) + # Select items that are not successfully parsed yet. + # We rely on raw_items.status/parse_attempts which are updated by insert_event_result. + rows = conn.execute( + """ + SELECT id, title, content, lang, status, parse_attempts, last_parse_at + FROM raw_items + WHERE status != 'parsed_ok' + AND COALESCE(parse_attempts, 0) < ? + AND (? - created_at) >= ? + AND (last_parse_at IS NULL OR (? - last_parse_at) >= ?) + ORDER BY id ASC + LIMIT ? + """, + (max_attempts, now, min_age_s, now, retry_after_s, batch), + ).fetchall() + return [dict(r) for r in rows] + + +def run_retry_loop(stop: threading.Event) -> None: + if os.environ.get("PARSE_RETRY_ENABLED", "1") not in ("1", "true", "TRUE", "yes", "YES"): + return + + interval_s = int(os.environ.get("PARSE_RETRY_INTERVAL_S", "120")) + batch = int(os.environ.get("PARSE_RETRY_BATCH", "5")) + max_attempts = int(os.environ.get("PARSE_RETRY_MAX_ATTEMPTS", "6")) + min_age_s = int(os.environ.get("PARSE_RETRY_MIN_AGE_S", "2")) + retry_after_s = int(os.environ.get("PARSE_RETRY_AFTER_S", "300")) + sleep_between_s = float(os.environ.get("PARSE_RETRY_SLEEP_BETWEEN_S", "0.5")) + + st = get_store() + + while not stop.is_set(): + try: + conn = st.connect() + ensure_schema(conn) + + items = _pick_pending( + conn, + batch=batch, + max_attempts=max_attempts, + min_age_s=min_age_s, + retry_after_s=retry_after_s, + ) + + for it in items: + if stop.is_set(): + break + + # Jitter to reduce gateway burstiness. + time.sleep(sleep_between_s + random.random() * 0.4) + + title = (it.get("title") or "")[:500] + content = (it.get("content") or "")[:20000] + lang_hint = it.get("lang") + + res = extract_event(title=title, content=content, lang_hint=lang_hint) + + if res.get("ok") is True: + insert_event_result( + conn, + raw_item_id=int(it["id"]), + model=str(res.get("model") or ""), + ok=True, + event_json=json.dumps(res.get("event"), ensure_ascii=True), + error=None, + ) + else: + # Preserve hint for debugging (no secrets). + err = str(res.get("error") or "unknown") + if err == "llm_failed": + exc = str(res.get("exc") or "") + if exc: + err = f"{err}:{exc}" + insert_event_result( + conn, + raw_item_id=int(it["id"]), + model=str(res.get("model") or ""), + ok=False, + event_json=None, + error=err, + ) + + except Exception: + # Keep the loop alive; details are in DB (last_error) or uvicorn logs. + pass + + stop.wait(interval_s) diff --git a/backend/fastapi_app/services/store.py b/backend/fastapi_app/services/store.py new file mode 100644 index 0000000..e4d4342 --- /dev/null +++ b/backend/fastapi_app/services/store.py @@ -0,0 +1,205 @@ +import os +import sqlite3 +import time +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class Store: + db_path: str + + def connect(self) -> sqlite3.Connection: + # check_same_thread=False allows use across simple dev reload threads. + conn = sqlite3.connect(self.db_path, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + + +def _sqlite_path_from_database_url(url: str) -> str: + # V1 supports only sqlite. Examples: + # - sqlite:///./eventflow.sqlite3 + # - sqlite:////abs/path/eventflow.sqlite3 + if not url.startswith("sqlite:///"): + raise ValueError("Only sqlite DATABASE_URL is supported in V1") + return url[len("sqlite:///") :] + + +def get_store() -> Store: + url = os.environ.get("DATABASE_URL", "sqlite:///./eventflow.sqlite3") + path = _sqlite_path_from_database_url(url) + return Store(db_path=path) + + +def _ensure_column(conn: sqlite3.Connection, *, table: str, col: str, ddl: str) -> None: + cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] + if col not in cols: + conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}") + + +def ensure_schema(conn: sqlite3.Connection) -> None: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + + CREATE TABLE IF NOT EXISTS raw_items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT NOT NULL, + item_date TEXT NOT NULL, + published_at TEXT, + url TEXT, + title TEXT, + content TEXT, + lang TEXT, + status TEXT NOT NULL DEFAULT 'new', + created_at INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_raw_items_source_date ON raw_items(source, item_date); + CREATE INDEX IF NOT EXISTS idx_raw_items_created_at ON raw_items(created_at); + + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_item_id INTEGER NOT NULL, + model TEXT, + ok INTEGER NOT NULL, + event_json TEXT, + error TEXT, + created_at INTEGER NOT NULL, + FOREIGN KEY(raw_item_id) REFERENCES raw_items(id) + ); + + CREATE INDEX IF NOT EXISTS idx_events_raw_item_id ON events(raw_item_id); + CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at); + """ + ) + + # Light migrations for V1: keep raw_items parse state for retry. + _ensure_column(conn, table="raw_items", col="parse_attempts", ddl="parse_attempts INTEGER NOT NULL DEFAULT 0") + _ensure_column(conn, table="raw_items", col="last_parse_at", ddl="last_parse_at INTEGER") + _ensure_column(conn, table="raw_items", col="last_error", ddl="last_error TEXT") + + conn.commit() + + +def insert_raw_item( + conn: sqlite3.Connection, + *, + source: str, + item_date: str, + title: str | None, + content: str | None, + url: str | None = None, + published_at: str | None = None, + lang: str | None = None, +) -> int: + now = int(time.time()) + cur = conn.execute( + """ + INSERT INTO raw_items (source, item_date, published_at, url, title, content, lang, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, 'new', ?) + """, + (source, item_date, published_at, url, title, content, lang, now), + ) + conn.commit() + return int(cur.lastrowid) + + +def insert_event_result( + conn: sqlite3.Connection, + *, + raw_item_id: int, + model: str, + ok: bool, + event_json: str | None, + error: str | None, +) -> int: + now = int(time.time()) + cur = conn.execute( + """ + INSERT INTO events (raw_item_id, model, ok, event_json, error, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (raw_item_id, model, 1 if ok else 0, event_json, error, now), + ) + + # Update raw item parse state (best-effort). + status = "parsed_ok" if ok else "parsed_err" + conn.execute( + """ + UPDATE raw_items + SET status = ?, + parse_attempts = COALESCE(parse_attempts, 0) + 1, + last_parse_at = ?, + last_error = ? + WHERE id = ? + """, + (status, now, error, raw_item_id), + ) + + conn.commit() + return int(cur.lastrowid) + + +def counts(conn: sqlite3.Connection) -> dict[str, Any]: + ensure_schema(conn) + row_raw = conn.execute("SELECT COUNT(1) AS n FROM raw_items").fetchone() + row_ev = conn.execute("SELECT COUNT(1) AS n FROM events").fetchone() + row_ok = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=1").fetchone() + row_bad = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=0").fetchone() + return { + "raw_items": int(row_raw["n"]), + "events": int(row_ev["n"]), + "events_ok": int(row_ok["n"]), + "events_err": int(row_bad["n"]), + } + + +def sources_today(conn: sqlite3.Connection, item_date: str) -> list[dict[str, Any]]: + ensure_schema(conn) + rows = conn.execute( + """ + SELECT source, COUNT(1) AS n + FROM raw_items + WHERE item_date = ? + GROUP BY source + ORDER BY source + """, + (item_date,), + ).fetchall() + return [{"source": r["source"], "count": int(r["n"])} for r in rows] + + +def list_raw_items(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]: + ensure_schema(conn) + rows = conn.execute( + """ + SELECT id, source, item_date, published_at, url, title, lang, status, created_at + FROM raw_items + ORDER BY id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + +def list_events(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]: + ensure_schema(conn) + rows = conn.execute( + """ + SELECT e.id, e.raw_item_id, e.model, e.ok, e.event_json, e.error, e.created_at, + r.source, r.item_date, r.title + FROM events e + JOIN raw_items r ON r.id = e.raw_item_id + ORDER BY e.id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + out: list[dict[str, Any]] = [] + for r in rows: + d = dict(r) + # keep event_json as string; UI can parse if needed + out.append(d) + return out diff --git a/eventflow.sqlite3-shm b/eventflow.sqlite3-shm new file mode 100644 index 0000000..2eb1696 Binary files /dev/null and b/eventflow.sqlite3-shm differ diff --git a/eventflow.sqlite3-wal b/eventflow.sqlite3-wal new file mode 100644 index 0000000..abbd90a Binary files /dev/null and b/eventflow.sqlite3-wal differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d80b5b9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +django>=5.0,<6.0 +fastapi>=0.110 +uvicorn[standard]>=0.27 +requests>=2.31 +feedparser>=6.0 +python-dotenv>=1.0 +openai>=1.40 +pydantic>=2.6