100 lines
3.4 KiB
Python
100 lines
3.4 KiB
Python
|
|
import json
|
||
|
|
import os
|
||
|
|
import random
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
|
||
|
|
from .llm_extract import extract_event
|
||
|
|
from .store import ensure_schema, get_store, insert_event_result
|
||
|
|
|
||
|
|
|
||
|
|
def _pick_pending(conn, *, batch: int, max_attempts: int, min_age_s: int, retry_after_s: int):
|
||
|
|
now = int(time.time())
|
||
|
|
# Select items that are not successfully parsed yet.
|
||
|
|
# We rely on raw_items.status/parse_attempts which are updated by insert_event_result.
|
||
|
|
rows = conn.execute(
|
||
|
|
"""
|
||
|
|
SELECT id, title, content, lang, status, parse_attempts, last_parse_at
|
||
|
|
FROM raw_items
|
||
|
|
WHERE status != 'parsed_ok'
|
||
|
|
AND COALESCE(parse_attempts, 0) < ?
|
||
|
|
AND (? - created_at) >= ?
|
||
|
|
AND (last_parse_at IS NULL OR (? - last_parse_at) >= ?)
|
||
|
|
ORDER BY id ASC
|
||
|
|
LIMIT ?
|
||
|
|
""",
|
||
|
|
(max_attempts, now, min_age_s, now, retry_after_s, batch),
|
||
|
|
).fetchall()
|
||
|
|
return [dict(r) for r in rows]
|
||
|
|
|
||
|
|
|
||
|
|
def run_retry_loop(stop: threading.Event) -> None:
|
||
|
|
if os.environ.get("PARSE_RETRY_ENABLED", "1") not in ("1", "true", "TRUE", "yes", "YES"):
|
||
|
|
return
|
||
|
|
|
||
|
|
interval_s = int(os.environ.get("PARSE_RETRY_INTERVAL_S", "120"))
|
||
|
|
batch = int(os.environ.get("PARSE_RETRY_BATCH", "5"))
|
||
|
|
max_attempts = int(os.environ.get("PARSE_RETRY_MAX_ATTEMPTS", "6"))
|
||
|
|
min_age_s = int(os.environ.get("PARSE_RETRY_MIN_AGE_S", "2"))
|
||
|
|
retry_after_s = int(os.environ.get("PARSE_RETRY_AFTER_S", "300"))
|
||
|
|
sleep_between_s = float(os.environ.get("PARSE_RETRY_SLEEP_BETWEEN_S", "0.5"))
|
||
|
|
|
||
|
|
st = get_store()
|
||
|
|
|
||
|
|
while not stop.is_set():
|
||
|
|
try:
|
||
|
|
conn = st.connect()
|
||
|
|
ensure_schema(conn)
|
||
|
|
|
||
|
|
items = _pick_pending(
|
||
|
|
conn,
|
||
|
|
batch=batch,
|
||
|
|
max_attempts=max_attempts,
|
||
|
|
min_age_s=min_age_s,
|
||
|
|
retry_after_s=retry_after_s,
|
||
|
|
)
|
||
|
|
|
||
|
|
for it in items:
|
||
|
|
if stop.is_set():
|
||
|
|
break
|
||
|
|
|
||
|
|
# Jitter to reduce gateway burstiness.
|
||
|
|
time.sleep(sleep_between_s + random.random() * 0.4)
|
||
|
|
|
||
|
|
title = (it.get("title") or "")[:500]
|
||
|
|
content = (it.get("content") or "")[:20000]
|
||
|
|
lang_hint = it.get("lang")
|
||
|
|
|
||
|
|
res = extract_event(title=title, content=content, lang_hint=lang_hint)
|
||
|
|
|
||
|
|
if res.get("ok") is True:
|
||
|
|
insert_event_result(
|
||
|
|
conn,
|
||
|
|
raw_item_id=int(it["id"]),
|
||
|
|
model=str(res.get("model") or ""),
|
||
|
|
ok=True,
|
||
|
|
event_json=json.dumps(res.get("event"), ensure_ascii=True),
|
||
|
|
error=None,
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
# Preserve hint for debugging (no secrets).
|
||
|
|
err = str(res.get("error") or "unknown")
|
||
|
|
if err == "llm_failed":
|
||
|
|
exc = str(res.get("exc") or "")
|
||
|
|
if exc:
|
||
|
|
err = f"{err}:{exc}"
|
||
|
|
insert_event_result(
|
||
|
|
conn,
|
||
|
|
raw_item_id=int(it["id"]),
|
||
|
|
model=str(res.get("model") or ""),
|
||
|
|
ok=False,
|
||
|
|
event_json=None,
|
||
|
|
error=err,
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception:
|
||
|
|
# Keep the loop alive; details are in DB (last_error) or uvicorn logs.
|
||
|
|
pass
|
||
|
|
|
||
|
|
stop.wait(interval_s)
|