eventflow: stabilize CCH responses and add codex fallback
This commit is contained in:
339
eventflow/backend/fastapi_app/services/llm_extract.py
Normal file
339
eventflow/backend/fastapi_app/services/llm_extract.py
Normal file
@@ -0,0 +1,339 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
|
||||
from openai import OpenAI
|
||||
|
||||
|
||||
def _client() -> OpenAI:
|
||||
base_url = os.environ.get("LLM_BASE_URL")
|
||||
api_key = os.environ.get("LLM_API_KEY")
|
||||
if not api_key:
|
||||
raise RuntimeError("LLM_API_KEY is not set")
|
||||
|
||||
# Keep LLM calls bounded; some gateways hang on upstream issues.
|
||||
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
|
||||
|
||||
# OpenAI SDK uses base_url for OpenAI-compatible gateways.
|
||||
return OpenAI(api_key=api_key, base_url=base_url, timeout=timeout_s)
|
||||
|
||||
|
||||
SYSTEM_PROMPT = (
|
||||
"You are a finance event extraction engine. Extract a single structured event. "
|
||||
"Be conservative: if uncertain, set impact_direction=0 and confidence low. "
|
||||
"Output MUST be valid JSON only (no markdown, no commentary)."
|
||||
)
|
||||
|
||||
|
||||
def _coerce_result(obj: dict[str, Any]) -> dict[str, Any]:
|
||||
# Keep the contract stable for downstream DB writes.
|
||||
out: dict[str, Any] = {}
|
||||
out["event_type"] = str(obj.get("event_type") or "unknown")
|
||||
|
||||
region = obj.get("region")
|
||||
if region is not None and str(region).strip() != "":
|
||||
out["region"] = str(region)
|
||||
|
||||
entities = obj.get("entities")
|
||||
if isinstance(entities, list):
|
||||
out["entities"] = [str(x) for x in entities if str(x).strip()]
|
||||
else:
|
||||
out["entities"] = []
|
||||
|
||||
out["summary_zh"] = str(obj.get("summary_zh") or "")
|
||||
out["summary_en"] = str(obj.get("summary_en") or "")
|
||||
|
||||
# -1/0/1 only
|
||||
try:
|
||||
impact_direction = int(obj.get("impact_direction"))
|
||||
except Exception:
|
||||
impact_direction = 0
|
||||
if impact_direction not in (-1, 0, 1):
|
||||
impact_direction = 0
|
||||
out["impact_direction"] = impact_direction
|
||||
|
||||
# 0..1
|
||||
try:
|
||||
confidence = float(obj.get("confidence"))
|
||||
except Exception:
|
||||
confidence = 0.0
|
||||
if confidence < 0.0:
|
||||
confidence = 0.0
|
||||
if confidence > 1.0:
|
||||
confidence = 1.0
|
||||
out["confidence"] = confidence
|
||||
|
||||
evidence = obj.get("evidence")
|
||||
if isinstance(evidence, list):
|
||||
out["evidence"] = [str(x) for x in evidence if str(x).strip()]
|
||||
else:
|
||||
out["evidence"] = []
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _responses_output_text(resp_obj: dict[str, Any]) -> str:
|
||||
txt = resp_obj.get("output_text")
|
||||
if isinstance(txt, str) and txt.strip():
|
||||
return txt
|
||||
|
||||
# Fallback for gateways that only return structured `output` blocks.
|
||||
out = resp_obj.get("output")
|
||||
if isinstance(out, list):
|
||||
chunks: list[str] = []
|
||||
for item in out:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
content = item.get("content")
|
||||
if not isinstance(content, list):
|
||||
continue
|
||||
for block in content:
|
||||
if not isinstance(block, dict):
|
||||
continue
|
||||
if block.get("type") in ("output_text", "text"):
|
||||
val = block.get("text")
|
||||
if isinstance(val, str) and val.strip():
|
||||
chunks.append(val)
|
||||
if chunks:
|
||||
return "\n".join(chunks)
|
||||
|
||||
return "{}"
|
||||
|
||||
|
||||
def _find_json_object(text: str) -> str | None:
|
||||
start = text.find("{")
|
||||
while start != -1:
|
||||
depth = 0
|
||||
in_str = False
|
||||
esc = False
|
||||
for i in range(start, len(text)):
|
||||
ch = text[i]
|
||||
if in_str:
|
||||
if esc:
|
||||
esc = False
|
||||
elif ch == "\\":
|
||||
esc = True
|
||||
elif ch == '"':
|
||||
in_str = False
|
||||
continue
|
||||
if ch == '"':
|
||||
in_str = True
|
||||
elif ch == "{":
|
||||
depth += 1
|
||||
elif ch == "}":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
candidate = text[start : i + 1]
|
||||
try:
|
||||
json.loads(candidate)
|
||||
return candidate
|
||||
except Exception:
|
||||
break
|
||||
start = text.find("{", start + 1)
|
||||
return None
|
||||
|
||||
|
||||
def _responses_create_http(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
|
||||
base_url = (os.environ.get("LLM_BASE_URL") or "").rstrip("/")
|
||||
api_key = os.environ.get("LLM_API_KEY")
|
||||
if not base_url:
|
||||
raise RuntimeError("LLM_BASE_URL is not set")
|
||||
if not api_key:
|
||||
raise RuntimeError("LLM_API_KEY is not set")
|
||||
|
||||
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
|
||||
|
||||
body = {
|
||||
"model": model,
|
||||
"stream": False,
|
||||
"max_output_tokens": int(os.environ.get("LLM_MAX_OUTPUT_TOKENS", "512")),
|
||||
"input": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": (
|
||||
SYSTEM_PROMPT
|
||||
+ "\n\nReturn valid JSON object only.\n\nInput:\n"
|
||||
+ json.dumps(user_payload, ensure_ascii=True)
|
||||
),
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
req = urllib.request.Request(
|
||||
url=f"{base_url}/responses",
|
||||
data=json.dumps(body).encode("utf-8"),
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
|
||||
raw = resp.read().decode("utf-8", errors="replace")
|
||||
except urllib.error.HTTPError as e:
|
||||
detail = e.read().decode("utf-8", errors="replace")
|
||||
raise RuntimeError(f"http_{e.code}: {detail[:5000]}") from e
|
||||
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
raise RuntimeError(f"invalid_gateway_json: {raw[:1000]}") from e
|
||||
|
||||
|
||||
def _codex_exec_fallback(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
|
||||
codex_bin = os.environ.get("CODEX_BIN", os.path.expanduser("~/.local/npm-global/bin/codex"))
|
||||
codex_timeout_s = float(os.environ.get("CODEX_FALLBACK_TIMEOUT_S", "90"))
|
||||
codex_cwd = os.environ.get("CODEX_FALLBACK_CWD", "/home/openclaw")
|
||||
|
||||
prompt = (
|
||||
"Return ONLY one valid JSON object with keys: "
|
||||
"event_type,region,entities,summary_zh,summary_en,impact_direction,confidence,evidence.\n"
|
||||
"No markdown, no extra text.\n"
|
||||
f"Input:\n{json.dumps(user_payload, ensure_ascii=True)}"
|
||||
)
|
||||
|
||||
proc = subprocess.run(
|
||||
[
|
||||
codex_bin,
|
||||
"exec",
|
||||
"-m",
|
||||
model,
|
||||
"-s",
|
||||
"read-only",
|
||||
"-C",
|
||||
codex_cwd,
|
||||
"--skip-git-repo-check",
|
||||
prompt,
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=codex_timeout_s,
|
||||
check=False,
|
||||
)
|
||||
|
||||
combined = (proc.stdout or "") + "\n" + (proc.stderr or "")
|
||||
candidate = _find_json_object(combined)
|
||||
if not candidate:
|
||||
raise RuntimeError(f"codex_no_json: {combined[:1000]}")
|
||||
return {"output_text": candidate}
|
||||
|
||||
|
||||
def extract_event(title: str, content: str, lang_hint: str | None = None) -> dict[str, Any]:
|
||||
"""Return a strict JSON object matching our event schema.
|
||||
|
||||
Supports both OpenAI-compatible APIs:
|
||||
- chat.completions (response_format=json_object)
|
||||
- responses (text.format=json_object)
|
||||
"""
|
||||
|
||||
model = os.environ.get("LLM_MODEL", "gpt-5.2")
|
||||
wire_api = (os.environ.get("LLM_WIRE_API") or os.environ.get("WIRE_API") or "chat").strip().lower()
|
||||
|
||||
user = {
|
||||
"title": title,
|
||||
"content": content,
|
||||
"lang_hint": lang_hint,
|
||||
"required_output": {
|
||||
"event_type": "string (macro/policy/war/sanctions/earnings/commodity/supply_shock/rate/inflation/...) ",
|
||||
"region": "string optional (CN/US/EU/JP/Global/...)",
|
||||
"entities": "array of strings (companies/countries/commodities/indices)",
|
||||
"summary_zh": "string",
|
||||
"summary_en": "string",
|
||||
"impact_direction": "-1|0|1 (market risk sentiment for affected assets)",
|
||||
"confidence": "float 0..1",
|
||||
"evidence": "array of strings (short quotes or key facts; URLs if present)",
|
||||
},
|
||||
}
|
||||
|
||||
c = _client() if wire_api not in ("responses", "response") else None
|
||||
|
||||
max_attempts = int(os.environ.get("LLM_MAX_ATTEMPTS", "3"))
|
||||
backoff_s = float(os.environ.get("LLM_RETRY_BACKOFF_S", "1.0"))
|
||||
|
||||
last_exc: Exception | None = None
|
||||
resp = None
|
||||
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
if wire_api in ("responses", "response"):
|
||||
# CCH is more reliable with direct /v1/responses HTTP + stream=false.
|
||||
resp = _responses_create_http(model=model, user_payload=user)
|
||||
else:
|
||||
resp = c.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": json.dumps(user, ensure_ascii=True)},
|
||||
],
|
||||
temperature=0.2,
|
||||
response_format={"type": "json_object"},
|
||||
)
|
||||
last_exc = None
|
||||
break
|
||||
except Exception as e:
|
||||
# Gateways can intermittently throw 5xx/connection errors. Retry a few times.
|
||||
last_exc = e
|
||||
|
||||
# Optional escape hatch: fallback to local Codex CLI when CCH responses are unstable.
|
||||
use_codex_fallback = os.environ.get("LLM_FALLBACK_CODEX", "1").lower() in ("1", "true", "yes")
|
||||
if use_codex_fallback and wire_api in ("responses", "response"):
|
||||
try:
|
||||
resp = _codex_exec_fallback(model=model, user_payload=user)
|
||||
last_exc = None
|
||||
break
|
||||
except Exception as fallback_exc:
|
||||
last_exc = fallback_exc
|
||||
|
||||
if attempt < max_attempts:
|
||||
time.sleep(backoff_s * (2 ** (attempt - 1)))
|
||||
continue
|
||||
break
|
||||
|
||||
if resp is None:
|
||||
return {
|
||||
"model": model,
|
||||
"ok": False,
|
||||
"error": "llm_failed",
|
||||
"exc": type(last_exc).__name__ if last_exc else "Unknown",
|
||||
"msg": str(last_exc)[:5000] if last_exc else "",
|
||||
}
|
||||
|
||||
if wire_api in ("responses", "response"):
|
||||
txt = _responses_output_text(resp if isinstance(resp, dict) else {})
|
||||
else:
|
||||
txt = resp.choices[0].message.content or "{}"
|
||||
try:
|
||||
obj = json.loads(txt)
|
||||
except json.JSONDecodeError:
|
||||
# Return raw text for debugging, but still keep a predictable envelope.
|
||||
return {
|
||||
"model": model,
|
||||
"ok": False,
|
||||
"error": "invalid_json",
|
||||
"raw": txt,
|
||||
}
|
||||
|
||||
if not isinstance(obj, dict):
|
||||
return {
|
||||
"model": model,
|
||||
"ok": False,
|
||||
"error": "non_object_json",
|
||||
"raw": txt,
|
||||
}
|
||||
|
||||
return {
|
||||
"model": model,
|
||||
"ok": True,
|
||||
"event": _coerce_result(obj),
|
||||
}
|
||||
Reference in New Issue
Block a user