206 lines
6.1 KiB
Python
206 lines
6.1 KiB
Python
import os
|
|
import sqlite3
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Store:
|
|
db_path: str
|
|
|
|
def connect(self) -> sqlite3.Connection:
|
|
# check_same_thread=False allows use across simple dev reload threads.
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _sqlite_path_from_database_url(url: str) -> str:
|
|
# V1 supports only sqlite. Examples:
|
|
# - sqlite:///./eventflow.sqlite3
|
|
# - sqlite:////abs/path/eventflow.sqlite3
|
|
if not url.startswith("sqlite:///"):
|
|
raise ValueError("Only sqlite DATABASE_URL is supported in V1")
|
|
return url[len("sqlite:///") :]
|
|
|
|
|
|
def get_store() -> Store:
|
|
url = os.environ.get("DATABASE_URL", "sqlite:///./eventflow.sqlite3")
|
|
path = _sqlite_path_from_database_url(url)
|
|
return Store(db_path=path)
|
|
|
|
|
|
def _ensure_column(conn: sqlite3.Connection, *, table: str, col: str, ddl: str) -> None:
|
|
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
|
if col not in cols:
|
|
conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
|
|
|
|
|
|
def ensure_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript(
|
|
"""
|
|
PRAGMA journal_mode=WAL;
|
|
|
|
CREATE TABLE IF NOT EXISTS raw_items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source TEXT NOT NULL,
|
|
item_date TEXT NOT NULL,
|
|
published_at TEXT,
|
|
url TEXT,
|
|
title TEXT,
|
|
content TEXT,
|
|
lang TEXT,
|
|
status TEXT NOT NULL DEFAULT 'new',
|
|
created_at INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_raw_items_source_date ON raw_items(source, item_date);
|
|
CREATE INDEX IF NOT EXISTS idx_raw_items_created_at ON raw_items(created_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
raw_item_id INTEGER NOT NULL,
|
|
model TEXT,
|
|
ok INTEGER NOT NULL,
|
|
event_json TEXT,
|
|
error TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
FOREIGN KEY(raw_item_id) REFERENCES raw_items(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_events_raw_item_id ON events(raw_item_id);
|
|
CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);
|
|
"""
|
|
)
|
|
|
|
# Light migrations for V1: keep raw_items parse state for retry.
|
|
_ensure_column(conn, table="raw_items", col="parse_attempts", ddl="parse_attempts INTEGER NOT NULL DEFAULT 0")
|
|
_ensure_column(conn, table="raw_items", col="last_parse_at", ddl="last_parse_at INTEGER")
|
|
_ensure_column(conn, table="raw_items", col="last_error", ddl="last_error TEXT")
|
|
|
|
conn.commit()
|
|
|
|
|
|
def insert_raw_item(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
source: str,
|
|
item_date: str,
|
|
title: str | None,
|
|
content: str | None,
|
|
url: str | None = None,
|
|
published_at: str | None = None,
|
|
lang: str | None = None,
|
|
) -> int:
|
|
now = int(time.time())
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO raw_items (source, item_date, published_at, url, title, content, lang, status, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 'new', ?)
|
|
""",
|
|
(source, item_date, published_at, url, title, content, lang, now),
|
|
)
|
|
conn.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def insert_event_result(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
raw_item_id: int,
|
|
model: str,
|
|
ok: bool,
|
|
event_json: str | None,
|
|
error: str | None,
|
|
) -> int:
|
|
now = int(time.time())
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO events (raw_item_id, model, ok, event_json, error, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(raw_item_id, model, 1 if ok else 0, event_json, error, now),
|
|
)
|
|
|
|
# Update raw item parse state (best-effort).
|
|
status = "parsed_ok" if ok else "parsed_err"
|
|
conn.execute(
|
|
"""
|
|
UPDATE raw_items
|
|
SET status = ?,
|
|
parse_attempts = COALESCE(parse_attempts, 0) + 1,
|
|
last_parse_at = ?,
|
|
last_error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(status, now, error, raw_item_id),
|
|
)
|
|
|
|
conn.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def counts(conn: sqlite3.Connection) -> dict[str, Any]:
|
|
ensure_schema(conn)
|
|
row_raw = conn.execute("SELECT COUNT(1) AS n FROM raw_items").fetchone()
|
|
row_ev = conn.execute("SELECT COUNT(1) AS n FROM events").fetchone()
|
|
row_ok = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=1").fetchone()
|
|
row_bad = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=0").fetchone()
|
|
return {
|
|
"raw_items": int(row_raw["n"]),
|
|
"events": int(row_ev["n"]),
|
|
"events_ok": int(row_ok["n"]),
|
|
"events_err": int(row_bad["n"]),
|
|
}
|
|
|
|
|
|
def sources_today(conn: sqlite3.Connection, item_date: str) -> list[dict[str, Any]]:
|
|
ensure_schema(conn)
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT source, COUNT(1) AS n
|
|
FROM raw_items
|
|
WHERE item_date = ?
|
|
GROUP BY source
|
|
ORDER BY source
|
|
""",
|
|
(item_date,),
|
|
).fetchall()
|
|
return [{"source": r["source"], "count": int(r["n"])} for r in rows]
|
|
|
|
|
|
def list_raw_items(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]:
|
|
ensure_schema(conn)
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, source, item_date, published_at, url, title, lang, status, created_at
|
|
FROM raw_items
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def list_events(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]:
|
|
ensure_schema(conn)
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT e.id, e.raw_item_id, e.model, e.ok, e.event_json, e.error, e.created_at,
|
|
r.source, r.item_date, r.title
|
|
FROM events e
|
|
JOIN raw_items r ON r.id = e.raw_item_id
|
|
ORDER BY e.id DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
out: list[dict[str, Any]] = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
# keep event_json as string; UI can parse if needed
|
|
out.append(d)
|
|
return out
|