import os import sqlite3 import time from dataclasses import dataclass from typing import Any @dataclass(frozen=True) class Store: db_path: str def connect(self) -> sqlite3.Connection: # check_same_thread=False allows use across simple dev reload threads. conn = sqlite3.connect(self.db_path, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def _sqlite_path_from_database_url(url: str) -> str: # V1 supports only sqlite. Examples: # - sqlite:///./eventflow.sqlite3 # - sqlite:////abs/path/eventflow.sqlite3 if not url.startswith("sqlite:///"): raise ValueError("Only sqlite DATABASE_URL is supported in V1") return url[len("sqlite:///") :] def get_store() -> Store: url = os.environ.get("DATABASE_URL", "sqlite:///./eventflow.sqlite3") path = _sqlite_path_from_database_url(url) return Store(db_path=path) def _ensure_column(conn: sqlite3.Connection, *, table: str, col: str, ddl: str) -> None: cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] if col not in cols: conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}") def ensure_schema(conn: sqlite3.Connection) -> None: conn.executescript( """ PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS raw_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, item_date TEXT NOT NULL, published_at TEXT, url TEXT, title TEXT, content TEXT, lang TEXT, status TEXT NOT NULL DEFAULT 'new', created_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_raw_items_source_date ON raw_items(source, item_date); CREATE INDEX IF NOT EXISTS idx_raw_items_created_at ON raw_items(created_at); CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, raw_item_id INTEGER NOT NULL, model TEXT, ok INTEGER NOT NULL, event_json TEXT, error TEXT, created_at INTEGER NOT NULL, FOREIGN KEY(raw_item_id) REFERENCES raw_items(id) ); CREATE INDEX IF NOT EXISTS idx_events_raw_item_id ON events(raw_item_id); CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at); """ ) # Light migrations for V1: keep raw_items parse state for retry. _ensure_column(conn, table="raw_items", col="parse_attempts", ddl="parse_attempts INTEGER NOT NULL DEFAULT 0") _ensure_column(conn, table="raw_items", col="last_parse_at", ddl="last_parse_at INTEGER") _ensure_column(conn, table="raw_items", col="last_error", ddl="last_error TEXT") conn.commit() def insert_raw_item( conn: sqlite3.Connection, *, source: str, item_date: str, title: str | None, content: str | None, url: str | None = None, published_at: str | None = None, lang: str | None = None, ) -> int: now = int(time.time()) cur = conn.execute( """ INSERT INTO raw_items (source, item_date, published_at, url, title, content, lang, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, 'new', ?) """, (source, item_date, published_at, url, title, content, lang, now), ) conn.commit() return int(cur.lastrowid) def insert_event_result( conn: sqlite3.Connection, *, raw_item_id: int, model: str, ok: bool, event_json: str | None, error: str | None, ) -> int: now = int(time.time()) cur = conn.execute( """ INSERT INTO events (raw_item_id, model, ok, event_json, error, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (raw_item_id, model, 1 if ok else 0, event_json, error, now), ) # Update raw item parse state (best-effort). status = "parsed_ok" if ok else "parsed_err" conn.execute( """ UPDATE raw_items SET status = ?, parse_attempts = COALESCE(parse_attempts, 0) + 1, last_parse_at = ?, last_error = ? WHERE id = ? """, (status, now, error, raw_item_id), ) conn.commit() return int(cur.lastrowid) def counts(conn: sqlite3.Connection) -> dict[str, Any]: ensure_schema(conn) row_raw = conn.execute("SELECT COUNT(1) AS n FROM raw_items").fetchone() row_ev = conn.execute("SELECT COUNT(1) AS n FROM events").fetchone() row_ok = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=1").fetchone() row_bad = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=0").fetchone() return { "raw_items": int(row_raw["n"]), "events": int(row_ev["n"]), "events_ok": int(row_ok["n"]), "events_err": int(row_bad["n"]), } def sources_today(conn: sqlite3.Connection, item_date: str) -> list[dict[str, Any]]: ensure_schema(conn) rows = conn.execute( """ SELECT source, COUNT(1) AS n FROM raw_items WHERE item_date = ? GROUP BY source ORDER BY source """, (item_date,), ).fetchall() return [{"source": r["source"], "count": int(r["n"])} for r in rows] def list_raw_items(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]: ensure_schema(conn) rows = conn.execute( """ SELECT id, source, item_date, published_at, url, title, lang, status, created_at FROM raw_items ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] def list_events(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]: ensure_schema(conn) rows = conn.execute( """ SELECT e.id, e.raw_item_id, e.model, e.ok, e.event_json, e.error, e.created_at, r.source, r.item_date, r.title FROM events e JOIN raw_items r ON r.id = e.raw_item_id ORDER BY e.id DESC LIMIT ? """, (limit,), ).fetchall() out: list[dict[str, Any]] = [] for r in rows: d = dict(r) # keep event_json as string; UI can parse if needed out.append(d) return out