import full eventflow project

This commit is contained in:
2026-03-13 17:18:19 +08:00
parent 0edc8f7477
commit bbae58e4fe
21 changed files with 1023 additions and 0 deletions

View File

View File

View File

View File

@@ -0,0 +1 @@
# V1 rules placeholder

View File

@@ -0,0 +1,27 @@
import threading
from fastapi import FastAPI
from .routers import analyze, ingest, query
from .services.retry_worker import run_retry_loop
app = FastAPI(title="eventflow-fastapi", version="0.1.0")
app.include_router(ingest.router, prefix="/ingest", tags=["ingest"])
app.include_router(analyze.router, prefix="/analyze", tags=["analyze"])
app.include_router(query.router, prefix="/query", tags=["query"])
_stop = threading.Event()
@app.on_event("startup")
def _startup() -> None:
# Background retry loop so transient gateway 5xx doesn't permanently stall parsing.
t = threading.Thread(target=run_retry_loop, args=(_stop,), daemon=True)
t.start()
@app.on_event("shutdown")
def _shutdown() -> None:
_stop.set()

View File

View File

@@ -0,0 +1,58 @@
import json
from fastapi import APIRouter, HTTPException
from ..services.llm_extract import extract_event
from ..services.store import ensure_schema, get_store, insert_event_result, insert_raw_item
router = APIRouter()
@router.post("/event")
def analyze_event(payload: dict):
"""Analyze one ad-hoc item and persist it.
This powers the UI while we build full ingesters.
"""
title = (payload.get("title") or "").strip()
content = (payload.get("content") or "").strip()
if not title and not content:
raise HTTPException(status_code=400, detail="missing title/content")
st = get_store()
conn = st.connect()
ensure_schema(conn)
raw_item_id = insert_raw_item(
conn,
source=str(payload.get("source") or "manual"),
item_date=str(payload.get("date") or "") or "manual",
title=title[:500],
content=content[:20_000],
url=payload.get("url"),
published_at=payload.get("published_at"),
lang=payload.get("lang"),
)
res = extract_event(title=title, content=content, lang_hint=payload.get("lang"))
if res.get("ok") is True:
insert_event_result(
conn,
raw_item_id=raw_item_id,
model=str(res.get("model") or ""),
ok=True,
event_json=json.dumps(res.get("event"), ensure_ascii=True),
error=None,
)
else:
insert_event_result(
conn,
raw_item_id=raw_item_id,
model=str(res.get("model") or ""),
ok=False,
event_json=None,
error=str(res.get("error") or "unknown"),
)
return {"raw_item_id": raw_item_id, **res}

View File

@@ -0,0 +1,214 @@
import json
from datetime import date
from fastapi import APIRouter, HTTPException
from ..services.llm_extract import extract_event
from ..services.market_moves import fetch_moves_via_qfr
from ..services.store import (
ensure_schema,
get_store,
insert_event_result,
insert_raw_item,
)
router = APIRouter()
def _today() -> str:
return date.today().isoformat()
@router.post("/rss")
def ingest_rss(payload: dict):
"""Ingest one or many RSS items.
Expected payload:
{"items": [{"title":..., "url":..., "published_at":..., "summary":..., "lang":...}, ...]}
"""
items = payload.get("items")
if not isinstance(items, list) or not items:
raise HTTPException(status_code=400, detail="payload.items must be a non-empty list")
st = get_store()
conn = st.connect()
ensure_schema(conn)
n = 0
for it in items:
if not isinstance(it, dict):
continue
insert_raw_item(
conn,
source="rss",
item_date=payload.get("date") or _today(),
title=(it.get("title") or "")[:500],
content=(it.get("summary") or it.get("content") or "")[:20_000],
url=it.get("url"),
published_at=it.get("published_at"),
lang=it.get("lang"),
)
n += 1
return {"ok": True, "inserted": n}
@router.post("/macro")
def ingest_macro(payload: dict):
items = payload.get("items")
if not isinstance(items, list) or not items:
raise HTTPException(status_code=400, detail="payload.items must be a non-empty list")
st = get_store()
conn = st.connect()
ensure_schema(conn)
n = 0
for it in items:
if not isinstance(it, dict):
continue
insert_raw_item(
conn,
source="macro",
item_date=payload.get("date") or _today(),
title=(it.get("title") or "")[:500],
content=(it.get("content") or "")[:20_000],
url=it.get("url"),
published_at=it.get("published_at"),
lang=it.get("lang"),
)
n += 1
return {"ok": True, "inserted": n}
@router.post("/market_moves")
def ingest_market_moves(payload: dict):
items = payload.get("items")
if not isinstance(items, list) or not items:
raise HTTPException(status_code=400, detail="payload.items must be a non-empty list")
st = get_store()
conn = st.connect()
ensure_schema(conn)
n = 0
for it in items:
if not isinstance(it, dict):
continue
insert_raw_item(
conn,
source="market_moves",
item_date=payload.get("date") or _today(),
title=(it.get("title") or "")[:500],
content=(it.get("content") or "")[:20_000],
url=it.get("url"),
published_at=it.get("published_at"),
lang=it.get("lang"),
)
n += 1
return {"ok": True, "inserted": n}
@router.post("/market_moves/run")
def run_market_moves(payload: dict | None = None):
"""Generate daily market-move items from QFR raw data and parse them into events."""
payload = payload or {}
day = str(payload.get("date") or _today())
# QFR raw data uses trade_date like YYYYMMDD.
trade_date = day.replace("-", "")
st = get_store()
conn = st.connect()
ensure_schema(conn)
data = fetch_moves_via_qfr(trade_date=trade_date, symbols=payload.get("symbols"))
if not data.get("ok"):
raise HTTPException(status_code=500, detail=data)
inserted = 0
parsed_ok = 0
parsed_err = 0
for mv in data.get("moves", []):
sym = mv.get("symbol")
td = mv.get("trade_date")
ret_1d = mv.get("ret_1d")
vol_20d = mv.get("vol_20d")
z_1d = mv.get("z_1d")
title = f"Market move {sym} {td}: ret_1d={ret_1d:.4f} z_1d={z_1d:.2f}" if isinstance(ret_1d, (int, float)) and isinstance(z_1d, (int, float)) else f"Market move {sym} {td}"
content = (
f"symbol={sym}\n"
f"trade_date={td}\n"
f"prev_trade_date={mv.get('prev_trade_date')}\n"
f"close={mv.get('close')} prev_close={mv.get('prev_close')}\n"
f"ret_1d={ret_1d} vol_20d={vol_20d} z_1d={z_1d}\n"
"Interpretation task: explain the most likely macro/industry drivers for this move and which assets could be affected."
)
raw_item_id = insert_raw_item(
conn,
source="market_moves",
item_date=day,
title=title[:500],
content=content[:20_000],
url=None,
published_at=None,
lang="en",
)
inserted += 1
try:
res = extract_event(title=title, content=content, lang_hint="en")
except Exception as e:
# Network/provider errors should not abort the whole batch.
insert_event_result(
conn,
raw_item_id=raw_item_id,
model="",
ok=False,
event_json=None,
error=f"llm_exception:{type(e).__name__}",
)
parsed_err += 1
continue
if res.get("ok") is True:
insert_event_result(
conn,
raw_item_id=raw_item_id,
model=str(res.get("model") or ""),
ok=True,
event_json=json.dumps(res.get("event"), ensure_ascii=True),
error=None,
)
parsed_ok += 1
else:
err = str(res.get("error") or "unknown")
if err == "llm_failed":
# Keep a short hint to debug gateway flakiness without dumping secrets.
exc = str(res.get("exc") or "")
if exc:
err = f"{err}:{exc}"
insert_event_result(
conn,
raw_item_id=raw_item_id,
model=str(res.get("model") or ""),
ok=False,
event_json=None,
error=err,
)
parsed_err += 1
return {
"ok": True,
"date": day,
"inserted": inserted,
"parsed_ok": parsed_ok,
"parsed_err": parsed_err,
"errors": data.get("errors", []),
"symbols": data.get("symbols"),
}

View File

@@ -0,0 +1,215 @@
from datetime import date
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
from ..services.store import counts, get_store, list_events, list_raw_items, sources_today
router = APIRouter()
@router.get("/health")
def health():
return {"ok": True}
@router.get("/status")
def status(day: str | None = None):
st = get_store()
conn = st.connect()
item_date = day or date.today().isoformat()
return {
"date": item_date,
"counts": counts(conn),
"sources": sources_today(conn, item_date),
}
@router.get("/raw_items")
def raw_items(limit: int = 20):
st = get_store()
conn = st.connect()
return {"items": list_raw_items(conn, limit=limit)}
@router.get("/events")
def events(limit: int = 20):
st = get_store()
conn = st.connect()
return {"items": list_events(conn, limit=limit)}
@router.get("/ui", response_class=HTMLResponse)
def ui():
# Tiny no-build UI for early validation.
html = """<!doctype html>
<html>
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>EventFlow V1</title>
<style>
body { font-family: ui-sans-serif, system-ui, Arial; margin: 18px; }
h1 { margin: 0 0 8px 0; }
.row { display: flex; gap: 16px; flex-wrap: wrap; }
.card { border: 1px solid #ddd; border-radius: 8px; padding: 12px; min-width: 320px; }
table { border-collapse: collapse; width: 100%; }
th, td { border-bottom: 1px solid #eee; padding: 6px 8px; font-size: 13px; vertical-align: top; }
th { text-align: left; color: #444; }
code { background: #f6f6f6; padding: 2px 4px; border-radius: 4px; }
.muted { color: #666; }
.ok { color: #0a7; font-weight: 600; }
.bad { color: #b00; font-weight: 600; }
.mono { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; }
button { padding: 6px 10px; }
input, textarea { width: 100%; box-sizing: border-box; }
textarea { height: 90px; }
</style>
</head>
<body>
<h1>EventFlow V1</h1>
<div class=\"muted\">Status page: sources, dates, counts, and parsed events.</div>
<div style=\"height:12px\"></div>
<div class=\"row\">
<div class=\"card\" style=\"flex: 1\">
<div style=\"display:flex; justify-content:space-between; align-items:center; gap:12px;\">
<div>
<div><b>Today</b>: <span id=\"today\"></span></div>
<div class=\"muted\">Auto-refresh every 5s</div>
</div>
<button onclick=\"refreshAll()\">Refresh</button>
</div>
<div style=\"height:10px\"></div>
<div id=\"counts\" class=\"mono\"></div>
<div style=\"height:10px\"></div>
<div><b>Sources (today)</b></div>
<table id=\"sources\"></table>
</div>
<div class=\"card\" style=\"flex: 1\">
<div><b>Quick Analyze (manual)</b></div>
<div class=\"muted\">This will save a raw item + parsed event into SQLite.</div>
<div style=\"height:8px\"></div>
<label>Title</label>
<input id=\"m_title\" placeholder=\"e.g. Fed signals higher-for-longer rates\" />
<div style=\"height:6px\"></div>
<label>Content</label>
<textarea id=\"m_content\" placeholder=\"Paste a paragraph...\"></textarea>
<div style=\"height:6px\"></div>
<button onclick=\"runAnalyze()\">Analyze</button>
<div id=\"m_out\" class=\"mono\" style=\"white-space:pre-wrap; margin-top:10px;\"></div>
</div>
</div>
<div style=\"height:16px\"></div>
<div class=\"row\">
<div class=\"card\" style=\"flex: 1\">
<div><b>Latest Raw Items</b> <span class=\"muted\">(limit 20)</span></div>
<table id=\"raw\"></table>
</div>
<div class=\"card\" style=\"flex: 1\">
<div><b>Latest Parsed Events</b> <span class=\"muted\">(limit 20)</span></div>
<table id=\"events\"></table>
</div>
</div>
<script>
function esc(s) {
return (s ?? '').toString().replaceAll('&', '&amp;').replaceAll('<', '&lt;').replaceAll('>', '&gt;');
}
async function jget(url) {
const r = await fetch(url);
if (!r.ok) throw new Error('HTTP ' + r.status);
return await r.json();
}
function setTable(el, headers, rows) {
let h = '<tr>' + headers.map(x => `<th>${esc(x)}</th>`).join('') + '</tr>';
let b = rows.map(r => '<tr>' + r.map(x => `<td>${x}</td>`).join('') + '</tr>').join('');
el.innerHTML = h + b;
}
async function refreshStatus() {
const st = await jget('/query/status');
document.getElementById('today').textContent = st.date;
const c = st.counts;
document.getElementById('counts').textContent = `raw_items=${c.raw_items} events=${c.events} ok=${c.events_ok} err=${c.events_err}`;
const srows = (st.sources || []).map(x => [esc(x.source), esc(x.count)]);
setTable(document.getElementById('sources'), ['source', 'count'], srows);
}
async function refreshRaw() {
const data = await jget('/query/raw_items?limit=20');
const rows = (data.items || []).map(it => {
const u = it.url ? `<a href="${esc(it.url)}" target="_blank">link</a>` : '';
return [
`<span class="mono">${esc(it.id)}</span>`,
esc(it.source),
esc(it.item_date),
esc(it.lang || ''),
esc((it.title || '').slice(0, 120)),
u,
];
});
setTable(document.getElementById('raw'), ['id', 'source', 'date', 'lang', 'title', 'url'], rows);
}
async function refreshEvents() {
const data = await jget('/query/events?limit=20');
const rows = (data.items || []).map(it => {
const cls = it.ok ? 'ok' : 'bad';
let ev = '';
try {
if (it.event_json) {
const obj = JSON.parse(it.event_json);
ev = `${esc(obj.event_type)} dir=${esc(obj.impact_direction)} conf=${esc(obj.confidence)}<br/><span class="muted">${esc((obj.summary_en || obj.summary_zh || '').slice(0, 140))}</span>`;
}
} catch (e) {
ev = '<span class="bad">bad json</span>';
}
return [
`<span class="mono">${esc(it.id)}</span>`,
esc(it.source),
`<span class="${cls}">${it.ok ? 'ok' : 'err'}</span>`,
esc((it.title || '').slice(0, 90)),
ev,
];
});
setTable(document.getElementById('events'), ['id', 'source', 'ok', 'raw_title', 'event'], rows);
}
async function runAnalyze() {
const title = document.getElementById('m_title').value;
const content = document.getElementById('m_content').value;
const out = document.getElementById('m_out');
out.textContent = 'running...';
const r = await fetch('/analyze/event', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({source: 'manual', date: new Date().toISOString().slice(0,10), title, content, lang: ''}),
});
const j = await r.json();
out.textContent = JSON.stringify(j, null, 2);
await refreshAll();
}
async function refreshAll() {
try {
await refreshStatus();
await refreshRaw();
await refreshEvents();
} catch (e) {
console.error(e);
}
}
refreshAll();
setInterval(refreshAll, 5000);
</script>
</body>
</html>"""
return HTMLResponse(content=html)

View File

View File

@@ -0,0 +1,339 @@
import json
import os
import subprocess
import time
import urllib.error
import urllib.request
from typing import Any
from openai import OpenAI
def _client() -> OpenAI:
base_url = os.environ.get("LLM_BASE_URL")
api_key = os.environ.get("LLM_API_KEY")
if not api_key:
raise RuntimeError("LLM_API_KEY is not set")
# Keep LLM calls bounded; some gateways hang on upstream issues.
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
# OpenAI SDK uses base_url for OpenAI-compatible gateways.
return OpenAI(api_key=api_key, base_url=base_url, timeout=timeout_s)
SYSTEM_PROMPT = (
"You are a finance event extraction engine. Extract a single structured event. "
"Be conservative: if uncertain, set impact_direction=0 and confidence low. "
"Output MUST be valid JSON only (no markdown, no commentary)."
)
def _coerce_result(obj: dict[str, Any]) -> dict[str, Any]:
# Keep the contract stable for downstream DB writes.
out: dict[str, Any] = {}
out["event_type"] = str(obj.get("event_type") or "unknown")
region = obj.get("region")
if region is not None and str(region).strip() != "":
out["region"] = str(region)
entities = obj.get("entities")
if isinstance(entities, list):
out["entities"] = [str(x) for x in entities if str(x).strip()]
else:
out["entities"] = []
out["summary_zh"] = str(obj.get("summary_zh") or "")
out["summary_en"] = str(obj.get("summary_en") or "")
# -1/0/1 only
try:
impact_direction = int(obj.get("impact_direction"))
except Exception:
impact_direction = 0
if impact_direction not in (-1, 0, 1):
impact_direction = 0
out["impact_direction"] = impact_direction
# 0..1
try:
confidence = float(obj.get("confidence"))
except Exception:
confidence = 0.0
if confidence < 0.0:
confidence = 0.0
if confidence > 1.0:
confidence = 1.0
out["confidence"] = confidence
evidence = obj.get("evidence")
if isinstance(evidence, list):
out["evidence"] = [str(x) for x in evidence if str(x).strip()]
else:
out["evidence"] = []
return out
def _responses_output_text(resp_obj: dict[str, Any]) -> str:
txt = resp_obj.get("output_text")
if isinstance(txt, str) and txt.strip():
return txt
# Fallback for gateways that only return structured `output` blocks.
out = resp_obj.get("output")
if isinstance(out, list):
chunks: list[str] = []
for item in out:
if not isinstance(item, dict):
continue
content = item.get("content")
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in ("output_text", "text"):
val = block.get("text")
if isinstance(val, str) and val.strip():
chunks.append(val)
if chunks:
return "\n".join(chunks)
return "{}"
def _find_json_object(text: str) -> str | None:
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
try:
json.loads(candidate)
return candidate
except Exception:
break
start = text.find("{", start + 1)
return None
def _responses_create_http(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
base_url = (os.environ.get("LLM_BASE_URL") or "").rstrip("/")
api_key = os.environ.get("LLM_API_KEY")
if not base_url:
raise RuntimeError("LLM_BASE_URL is not set")
if not api_key:
raise RuntimeError("LLM_API_KEY is not set")
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
body = {
"model": model,
"stream": False,
"max_output_tokens": int(os.environ.get("LLM_MAX_OUTPUT_TOKENS", "512")),
"input": [
{
"role": "user",
"content": [
{
"type": "text",
"text": (
SYSTEM_PROMPT
+ "\n\nReturn valid JSON object only.\n\nInput:\n"
+ json.dumps(user_payload, ensure_ascii=True)
),
}
],
}
],
}
req = urllib.request.Request(
url=f"{base_url}/responses",
data=json.dumps(body).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"http_{e.code}: {detail[:5000]}") from e
try:
return json.loads(raw)
except json.JSONDecodeError as e:
raise RuntimeError(f"invalid_gateway_json: {raw[:1000]}") from e
def _codex_exec_fallback(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
codex_bin = os.environ.get("CODEX_BIN", os.path.expanduser("~/.local/npm-global/bin/codex"))
codex_timeout_s = float(os.environ.get("CODEX_FALLBACK_TIMEOUT_S", "90"))
codex_cwd = os.environ.get("CODEX_FALLBACK_CWD", "/home/openclaw")
prompt = (
"Return ONLY one valid JSON object with keys: "
"event_type,region,entities,summary_zh,summary_en,impact_direction,confidence,evidence.\n"
"No markdown, no extra text.\n"
f"Input:\n{json.dumps(user_payload, ensure_ascii=True)}"
)
proc = subprocess.run(
[
codex_bin,
"exec",
"-m",
model,
"-s",
"read-only",
"-C",
codex_cwd,
"--skip-git-repo-check",
prompt,
],
capture_output=True,
text=True,
timeout=codex_timeout_s,
check=False,
)
combined = (proc.stdout or "") + "\n" + (proc.stderr or "")
candidate = _find_json_object(combined)
if not candidate:
raise RuntimeError(f"codex_no_json: {combined[:1000]}")
return {"output_text": candidate}
def extract_event(title: str, content: str, lang_hint: str | None = None) -> dict[str, Any]:
"""Return a strict JSON object matching our event schema.
Supports both OpenAI-compatible APIs:
- chat.completions (response_format=json_object)
- responses (text.format=json_object)
"""
model = os.environ.get("LLM_MODEL", "gpt-5.2")
wire_api = (os.environ.get("LLM_WIRE_API") or os.environ.get("WIRE_API") or "chat").strip().lower()
user = {
"title": title,
"content": content,
"lang_hint": lang_hint,
"required_output": {
"event_type": "string (macro/policy/war/sanctions/earnings/commodity/supply_shock/rate/inflation/...) ",
"region": "string optional (CN/US/EU/JP/Global/...)",
"entities": "array of strings (companies/countries/commodities/indices)",
"summary_zh": "string",
"summary_en": "string",
"impact_direction": "-1|0|1 (market risk sentiment for affected assets)",
"confidence": "float 0..1",
"evidence": "array of strings (short quotes or key facts; URLs if present)",
},
}
c = _client() if wire_api not in ("responses", "response") else None
max_attempts = int(os.environ.get("LLM_MAX_ATTEMPTS", "3"))
backoff_s = float(os.environ.get("LLM_RETRY_BACKOFF_S", "1.0"))
last_exc: Exception | None = None
resp = None
for attempt in range(1, max_attempts + 1):
try:
if wire_api in ("responses", "response"):
# CCH is more reliable with direct /v1/responses HTTP + stream=false.
resp = _responses_create_http(model=model, user_payload=user)
else:
resp = c.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(user, ensure_ascii=True)},
],
temperature=0.2,
response_format={"type": "json_object"},
)
last_exc = None
break
except Exception as e:
# Gateways can intermittently throw 5xx/connection errors. Retry a few times.
last_exc = e
# Optional escape hatch: fallback to local Codex CLI when CCH responses are unstable.
use_codex_fallback = os.environ.get("LLM_FALLBACK_CODEX", "1").lower() in ("1", "true", "yes")
if use_codex_fallback and wire_api in ("responses", "response"):
try:
resp = _codex_exec_fallback(model=model, user_payload=user)
last_exc = None
break
except Exception as fallback_exc:
last_exc = fallback_exc
if attempt < max_attempts:
time.sleep(backoff_s * (2 ** (attempt - 1)))
continue
break
if resp is None:
return {
"model": model,
"ok": False,
"error": "llm_failed",
"exc": type(last_exc).__name__ if last_exc else "Unknown",
"msg": str(last_exc)[:5000] if last_exc else "",
}
if wire_api in ("responses", "response"):
txt = _responses_output_text(resp if isinstance(resp, dict) else {})
else:
txt = resp.choices[0].message.content or "{}"
try:
obj = json.loads(txt)
except json.JSONDecodeError:
# Return raw text for debugging, but still keep a predictable envelope.
return {
"model": model,
"ok": False,
"error": "invalid_json",
"raw": txt,
}
if not isinstance(obj, dict):
return {
"model": model,
"ok": False,
"error": "non_object_json",
"raw": txt,
}
return {
"model": model,
"ok": True,
"event": _coerce_result(obj),
}

View File

@@ -0,0 +1,68 @@
import json
import os
import subprocess
from typing import Any
WATCHLIST_DEFAULT = [
# A-share ETF proxies for indices (QFR raw data uses ETF parquets)
# HS300
"510300.SH",
# ZZ500
"510500.SH",
# ChiNext
"159915.SZ",
# SSE50 proxy (may not exist in rawdir unless downloaded)
"510050.SH",
# Futures placeholders (not in QFR rawdir by default; will show as errors until sourced)
"AU.SHF",
"CU.SHF",
"M.DCE",
"TA.CZCE",
"SC.INE",
]
def fetch_moves_via_qfr(*, trade_date: str | None = None, symbols: list[str] | None = None) -> dict[str, Any]:
"""Fetch day-level moves by shelling out to the existing qfr conda env.
Reason: eventflow env is kept minimal; qfr env already has pandas/pyarrow.
"""
sym_list = symbols or WATCHLIST_DEFAULT
rawdir = os.environ.get("QFR_RAWDIR", "/home/openclaw/projects/quant-factor-research/data/raw")
env = os.environ.copy()
env["QFR_RAWDIR"] = rawdir
env["QFR_SYMBOLS"] = ",".join(sym_list)
if trade_date:
env["QFR_TRADE_DATE"] = trade_date
conda = os.environ.get("CONDA_BIN", "/home/openclaw/miniconda3/bin/conda")
script = os.path.join(os.path.dirname(__file__), "market_moves_qfr.py")
cmd = [conda, "run", "-n", "qfr", "python", script]
proc = subprocess.run(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
return {
"ok": False,
"error": "qfr_subprocess_failed",
"returncode": proc.returncode,
"stderr": proc.stderr[-4000:],
}
try:
data = json.loads(proc.stdout)
except json.JSONDecodeError:
return {
"ok": False,
"error": "invalid_json_from_qfr",
"stdout": proc.stdout[-2000:],
"stderr": proc.stderr[-2000:],
}
data["ok"] = True
data["symbols"] = sym_list
return data

View File

@@ -0,0 +1,108 @@
import json
import os
from dataclasses import asdict, dataclass
from typing import Any
import pandas as pd
@dataclass
class Move:
symbol: str
trade_date: str
prev_trade_date: str
close: float
prev_close: float
ret_1d: float
vol_20d: float
z_1d: float
def _read_one(rawdir: str, symbol: str) -> pd.DataFrame:
# QFR stores parquet as e.g. 510300SH.parquet / 159915SZ.parquet
fn = symbol.replace(".", "") + ".parquet"
p = os.path.join(rawdir, fn)
df = pd.read_parquet(p)
# Standardize
if "trade_date" not in df.columns or "close" not in df.columns:
raise RuntimeError(f"unexpected parquet schema for {symbol}: {df.columns.tolist()}")
df = df.sort_values("trade_date").reset_index(drop=True)
return df
def _calc_move(df: pd.DataFrame, symbol: str, trade_date: str | None) -> Move | None:
if df.empty:
return None
# pick last available <= trade_date if given
if trade_date:
df2 = df[df["trade_date"] <= trade_date]
if df2.empty:
return None
df = df2
if len(df) < 2:
return None
# compute returns
close = df["close"].astype(float)
ret = close.pct_change()
i = len(df) - 1
prev_i = i - 1
td = str(df.iloc[i]["trade_date"])
ptd = str(df.iloc[prev_i]["trade_date"])
close_i = float(close.iloc[i])
prev_close_i = float(close.iloc[prev_i])
ret_1d = float(ret.iloc[i])
# vol over last 20 returns (excluding NaN)
vol_20 = float(ret.iloc[max(0, i - 20 + 1) : i + 1].std(skipna=True))
if not (vol_20 > 0):
vol_20 = 0.0
z = float(ret_1d / vol_20) if vol_20 > 0 else 0.0
return Move(
symbol=symbol,
trade_date=td,
prev_trade_date=ptd,
close=close_i,
prev_close=prev_close_i,
ret_1d=ret_1d,
vol_20d=vol_20,
z_1d=z,
)
def main() -> None:
rawdir = os.environ.get("QFR_RAWDIR") or "/home/openclaw/projects/quant-factor-research/data/raw"
symbols = (os.environ.get("QFR_SYMBOLS") or "").strip()
trade_date = (os.environ.get("QFR_TRADE_DATE") or "").strip() or None
if not symbols:
raise SystemExit("QFR_SYMBOLS is required")
out: dict[str, Any] = {
"rawdir": rawdir,
"trade_date": trade_date,
"moves": [],
"errors": [],
}
for sym in [s.strip() for s in symbols.split(",") if s.strip()]:
try:
df = _read_one(rawdir, sym)
mv = _calc_move(df, sym, trade_date)
if mv is None:
out["errors"].append({"symbol": sym, "error": "no_data"})
continue
out["moves"].append(asdict(mv))
except Exception as e:
out["errors"].append({"symbol": sym, "error": str(e)})
print(json.dumps(out, ensure_ascii=True))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,99 @@
import json
import os
import random
import threading
import time
from .llm_extract import extract_event
from .store import ensure_schema, get_store, insert_event_result
def _pick_pending(conn, *, batch: int, max_attempts: int, min_age_s: int, retry_after_s: int):
now = int(time.time())
# Select items that are not successfully parsed yet.
# We rely on raw_items.status/parse_attempts which are updated by insert_event_result.
rows = conn.execute(
"""
SELECT id, title, content, lang, status, parse_attempts, last_parse_at
FROM raw_items
WHERE status != 'parsed_ok'
AND COALESCE(parse_attempts, 0) < ?
AND (? - created_at) >= ?
AND (last_parse_at IS NULL OR (? - last_parse_at) >= ?)
ORDER BY id ASC
LIMIT ?
""",
(max_attempts, now, min_age_s, now, retry_after_s, batch),
).fetchall()
return [dict(r) for r in rows]
def run_retry_loop(stop: threading.Event) -> None:
if os.environ.get("PARSE_RETRY_ENABLED", "1") not in ("1", "true", "TRUE", "yes", "YES"):
return
interval_s = int(os.environ.get("PARSE_RETRY_INTERVAL_S", "120"))
batch = int(os.environ.get("PARSE_RETRY_BATCH", "5"))
max_attempts = int(os.environ.get("PARSE_RETRY_MAX_ATTEMPTS", "6"))
min_age_s = int(os.environ.get("PARSE_RETRY_MIN_AGE_S", "2"))
retry_after_s = int(os.environ.get("PARSE_RETRY_AFTER_S", "300"))
sleep_between_s = float(os.environ.get("PARSE_RETRY_SLEEP_BETWEEN_S", "0.5"))
st = get_store()
while not stop.is_set():
try:
conn = st.connect()
ensure_schema(conn)
items = _pick_pending(
conn,
batch=batch,
max_attempts=max_attempts,
min_age_s=min_age_s,
retry_after_s=retry_after_s,
)
for it in items:
if stop.is_set():
break
# Jitter to reduce gateway burstiness.
time.sleep(sleep_between_s + random.random() * 0.4)
title = (it.get("title") or "")[:500]
content = (it.get("content") or "")[:20000]
lang_hint = it.get("lang")
res = extract_event(title=title, content=content, lang_hint=lang_hint)
if res.get("ok") is True:
insert_event_result(
conn,
raw_item_id=int(it["id"]),
model=str(res.get("model") or ""),
ok=True,
event_json=json.dumps(res.get("event"), ensure_ascii=True),
error=None,
)
else:
# Preserve hint for debugging (no secrets).
err = str(res.get("error") or "unknown")
if err == "llm_failed":
exc = str(res.get("exc") or "")
if exc:
err = f"{err}:{exc}"
insert_event_result(
conn,
raw_item_id=int(it["id"]),
model=str(res.get("model") or ""),
ok=False,
event_json=None,
error=err,
)
except Exception:
# Keep the loop alive; details are in DB (last_error) or uvicorn logs.
pass
stop.wait(interval_s)

View File

@@ -0,0 +1,205 @@
import os
import sqlite3
import time
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class Store:
db_path: str
def connect(self) -> sqlite3.Connection:
# check_same_thread=False allows use across simple dev reload threads.
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def _sqlite_path_from_database_url(url: str) -> str:
# V1 supports only sqlite. Examples:
# - sqlite:///./eventflow.sqlite3
# - sqlite:////abs/path/eventflow.sqlite3
if not url.startswith("sqlite:///"):
raise ValueError("Only sqlite DATABASE_URL is supported in V1")
return url[len("sqlite:///") :]
def get_store() -> Store:
url = os.environ.get("DATABASE_URL", "sqlite:///./eventflow.sqlite3")
path = _sqlite_path_from_database_url(url)
return Store(db_path=path)
def _ensure_column(conn: sqlite3.Connection, *, table: str, col: str, ddl: str) -> None:
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if col not in cols:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
def ensure_schema(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS raw_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
item_date TEXT NOT NULL,
published_at TEXT,
url TEXT,
title TEXT,
content TEXT,
lang TEXT,
status TEXT NOT NULL DEFAULT 'new',
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_raw_items_source_date ON raw_items(source, item_date);
CREATE INDEX IF NOT EXISTS idx_raw_items_created_at ON raw_items(created_at);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_item_id INTEGER NOT NULL,
model TEXT,
ok INTEGER NOT NULL,
event_json TEXT,
error TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(raw_item_id) REFERENCES raw_items(id)
);
CREATE INDEX IF NOT EXISTS idx_events_raw_item_id ON events(raw_item_id);
CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);
"""
)
# Light migrations for V1: keep raw_items parse state for retry.
_ensure_column(conn, table="raw_items", col="parse_attempts", ddl="parse_attempts INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, table="raw_items", col="last_parse_at", ddl="last_parse_at INTEGER")
_ensure_column(conn, table="raw_items", col="last_error", ddl="last_error TEXT")
conn.commit()
def insert_raw_item(
conn: sqlite3.Connection,
*,
source: str,
item_date: str,
title: str | None,
content: str | None,
url: str | None = None,
published_at: str | None = None,
lang: str | None = None,
) -> int:
now = int(time.time())
cur = conn.execute(
"""
INSERT INTO raw_items (source, item_date, published_at, url, title, content, lang, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, 'new', ?)
""",
(source, item_date, published_at, url, title, content, lang, now),
)
conn.commit()
return int(cur.lastrowid)
def insert_event_result(
conn: sqlite3.Connection,
*,
raw_item_id: int,
model: str,
ok: bool,
event_json: str | None,
error: str | None,
) -> int:
now = int(time.time())
cur = conn.execute(
"""
INSERT INTO events (raw_item_id, model, ok, event_json, error, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(raw_item_id, model, 1 if ok else 0, event_json, error, now),
)
# Update raw item parse state (best-effort).
status = "parsed_ok" if ok else "parsed_err"
conn.execute(
"""
UPDATE raw_items
SET status = ?,
parse_attempts = COALESCE(parse_attempts, 0) + 1,
last_parse_at = ?,
last_error = ?
WHERE id = ?
""",
(status, now, error, raw_item_id),
)
conn.commit()
return int(cur.lastrowid)
def counts(conn: sqlite3.Connection) -> dict[str, Any]:
ensure_schema(conn)
row_raw = conn.execute("SELECT COUNT(1) AS n FROM raw_items").fetchone()
row_ev = conn.execute("SELECT COUNT(1) AS n FROM events").fetchone()
row_ok = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=1").fetchone()
row_bad = conn.execute("SELECT COUNT(1) AS n FROM events WHERE ok=0").fetchone()
return {
"raw_items": int(row_raw["n"]),
"events": int(row_ev["n"]),
"events_ok": int(row_ok["n"]),
"events_err": int(row_bad["n"]),
}
def sources_today(conn: sqlite3.Connection, item_date: str) -> list[dict[str, Any]]:
ensure_schema(conn)
rows = conn.execute(
"""
SELECT source, COUNT(1) AS n
FROM raw_items
WHERE item_date = ?
GROUP BY source
ORDER BY source
""",
(item_date,),
).fetchall()
return [{"source": r["source"], "count": int(r["n"])} for r in rows]
def list_raw_items(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]:
ensure_schema(conn)
rows = conn.execute(
"""
SELECT id, source, item_date, published_at, url, title, lang, status, created_at
FROM raw_items
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def list_events(conn: sqlite3.Connection, limit: int = 20) -> list[dict[str, Any]]:
ensure_schema(conn)
rows = conn.execute(
"""
SELECT e.id, e.raw_item_id, e.model, e.ok, e.event_json, e.error, e.created_at,
r.source, r.item_date, r.title
FROM events e
JOIN raw_items r ON r.id = e.raw_item_id
ORDER BY e.id DESC
LIMIT ?
""",
(limit,),
).fetchall()
out: list[dict[str, Any]] = []
for r in rows:
d = dict(r)
# keep event_json as string; UI can parse if needed
out.append(d)
return out