From 0edc8f7477e594a7fefdbb8f0236a296e2adf65a Mon Sep 17 00:00:00 2001 From: openclaw Date: Fri, 13 Mar 2026 17:05:53 +0800 Subject: [PATCH] eventflow: stabilize CCH responses and add codex fallback --- .../fastapi_app/services/llm_extract.py | 339 ++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 eventflow/backend/fastapi_app/services/llm_extract.py diff --git a/eventflow/backend/fastapi_app/services/llm_extract.py b/eventflow/backend/fastapi_app/services/llm_extract.py new file mode 100644 index 0000000..89f81be --- /dev/null +++ b/eventflow/backend/fastapi_app/services/llm_extract.py @@ -0,0 +1,339 @@ +import json +import os +import subprocess +import time +import urllib.error +import urllib.request +from typing import Any + +from openai import OpenAI + + +def _client() -> OpenAI: + base_url = os.environ.get("LLM_BASE_URL") + api_key = os.environ.get("LLM_API_KEY") + if not api_key: + raise RuntimeError("LLM_API_KEY is not set") + + # Keep LLM calls bounded; some gateways hang on upstream issues. + timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30")) + + # OpenAI SDK uses base_url for OpenAI-compatible gateways. + return OpenAI(api_key=api_key, base_url=base_url, timeout=timeout_s) + + +SYSTEM_PROMPT = ( + "You are a finance event extraction engine. Extract a single structured event. " + "Be conservative: if uncertain, set impact_direction=0 and confidence low. " + "Output MUST be valid JSON only (no markdown, no commentary)." +) + + +def _coerce_result(obj: dict[str, Any]) -> dict[str, Any]: + # Keep the contract stable for downstream DB writes. + out: dict[str, Any] = {} + out["event_type"] = str(obj.get("event_type") or "unknown") + + region = obj.get("region") + if region is not None and str(region).strip() != "": + out["region"] = str(region) + + entities = obj.get("entities") + if isinstance(entities, list): + out["entities"] = [str(x) for x in entities if str(x).strip()] + else: + out["entities"] = [] + + out["summary_zh"] = str(obj.get("summary_zh") or "") + out["summary_en"] = str(obj.get("summary_en") or "") + + # -1/0/1 only + try: + impact_direction = int(obj.get("impact_direction")) + except Exception: + impact_direction = 0 + if impact_direction not in (-1, 0, 1): + impact_direction = 0 + out["impact_direction"] = impact_direction + + # 0..1 + try: + confidence = float(obj.get("confidence")) + except Exception: + confidence = 0.0 + if confidence < 0.0: + confidence = 0.0 + if confidence > 1.0: + confidence = 1.0 + out["confidence"] = confidence + + evidence = obj.get("evidence") + if isinstance(evidence, list): + out["evidence"] = [str(x) for x in evidence if str(x).strip()] + else: + out["evidence"] = [] + + return out + + +def _responses_output_text(resp_obj: dict[str, Any]) -> str: + txt = resp_obj.get("output_text") + if isinstance(txt, str) and txt.strip(): + return txt + + # Fallback for gateways that only return structured `output` blocks. + out = resp_obj.get("output") + if isinstance(out, list): + chunks: list[str] = [] + for item in out: + if not isinstance(item, dict): + continue + content = item.get("content") + if not isinstance(content, list): + continue + for block in content: + if not isinstance(block, dict): + continue + if block.get("type") in ("output_text", "text"): + val = block.get("text") + if isinstance(val, str) and val.strip(): + chunks.append(val) + if chunks: + return "\n".join(chunks) + + return "{}" + + +def _find_json_object(text: str) -> str | None: + start = text.find("{") + while start != -1: + depth = 0 + in_str = False + esc = False + for i in range(start, len(text)): + ch = text[i] + if in_str: + if esc: + esc = False + elif ch == "\\": + esc = True + elif ch == '"': + in_str = False + continue + if ch == '"': + in_str = True + elif ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0: + candidate = text[start : i + 1] + try: + json.loads(candidate) + return candidate + except Exception: + break + start = text.find("{", start + 1) + return None + + +def _responses_create_http(model: str, user_payload: dict[str, Any]) -> dict[str, Any]: + base_url = (os.environ.get("LLM_BASE_URL") or "").rstrip("/") + api_key = os.environ.get("LLM_API_KEY") + if not base_url: + raise RuntimeError("LLM_BASE_URL is not set") + if not api_key: + raise RuntimeError("LLM_API_KEY is not set") + + timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30")) + + body = { + "model": model, + "stream": False, + "max_output_tokens": int(os.environ.get("LLM_MAX_OUTPUT_TOKENS", "512")), + "input": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": ( + SYSTEM_PROMPT + + "\n\nReturn valid JSON object only.\n\nInput:\n" + + json.dumps(user_payload, ensure_ascii=True) + ), + } + ], + } + ], + } + + req = urllib.request.Request( + url=f"{base_url}/responses", + data=json.dumps(body).encode("utf-8"), + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + raw = resp.read().decode("utf-8", errors="replace") + except urllib.error.HTTPError as e: + detail = e.read().decode("utf-8", errors="replace") + raise RuntimeError(f"http_{e.code}: {detail[:5000]}") from e + + try: + return json.loads(raw) + except json.JSONDecodeError as e: + raise RuntimeError(f"invalid_gateway_json: {raw[:1000]}") from e + + +def _codex_exec_fallback(model: str, user_payload: dict[str, Any]) -> dict[str, Any]: + codex_bin = os.environ.get("CODEX_BIN", os.path.expanduser("~/.local/npm-global/bin/codex")) + codex_timeout_s = float(os.environ.get("CODEX_FALLBACK_TIMEOUT_S", "90")) + codex_cwd = os.environ.get("CODEX_FALLBACK_CWD", "/home/openclaw") + + prompt = ( + "Return ONLY one valid JSON object with keys: " + "event_type,region,entities,summary_zh,summary_en,impact_direction,confidence,evidence.\n" + "No markdown, no extra text.\n" + f"Input:\n{json.dumps(user_payload, ensure_ascii=True)}" + ) + + proc = subprocess.run( + [ + codex_bin, + "exec", + "-m", + model, + "-s", + "read-only", + "-C", + codex_cwd, + "--skip-git-repo-check", + prompt, + ], + capture_output=True, + text=True, + timeout=codex_timeout_s, + check=False, + ) + + combined = (proc.stdout or "") + "\n" + (proc.stderr or "") + candidate = _find_json_object(combined) + if not candidate: + raise RuntimeError(f"codex_no_json: {combined[:1000]}") + return {"output_text": candidate} + + +def extract_event(title: str, content: str, lang_hint: str | None = None) -> dict[str, Any]: + """Return a strict JSON object matching our event schema. + + Supports both OpenAI-compatible APIs: + - chat.completions (response_format=json_object) + - responses (text.format=json_object) + """ + + model = os.environ.get("LLM_MODEL", "gpt-5.2") + wire_api = (os.environ.get("LLM_WIRE_API") or os.environ.get("WIRE_API") or "chat").strip().lower() + + user = { + "title": title, + "content": content, + "lang_hint": lang_hint, + "required_output": { + "event_type": "string (macro/policy/war/sanctions/earnings/commodity/supply_shock/rate/inflation/...) ", + "region": "string optional (CN/US/EU/JP/Global/...)", + "entities": "array of strings (companies/countries/commodities/indices)", + "summary_zh": "string", + "summary_en": "string", + "impact_direction": "-1|0|1 (market risk sentiment for affected assets)", + "confidence": "float 0..1", + "evidence": "array of strings (short quotes or key facts; URLs if present)", + }, + } + + c = _client() if wire_api not in ("responses", "response") else None + + max_attempts = int(os.environ.get("LLM_MAX_ATTEMPTS", "3")) + backoff_s = float(os.environ.get("LLM_RETRY_BACKOFF_S", "1.0")) + + last_exc: Exception | None = None + resp = None + + for attempt in range(1, max_attempts + 1): + try: + if wire_api in ("responses", "response"): + # CCH is more reliable with direct /v1/responses HTTP + stream=false. + resp = _responses_create_http(model=model, user_payload=user) + else: + resp = c.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": json.dumps(user, ensure_ascii=True)}, + ], + temperature=0.2, + response_format={"type": "json_object"}, + ) + last_exc = None + break + except Exception as e: + # Gateways can intermittently throw 5xx/connection errors. Retry a few times. + last_exc = e + + # Optional escape hatch: fallback to local Codex CLI when CCH responses are unstable. + use_codex_fallback = os.environ.get("LLM_FALLBACK_CODEX", "1").lower() in ("1", "true", "yes") + if use_codex_fallback and wire_api in ("responses", "response"): + try: + resp = _codex_exec_fallback(model=model, user_payload=user) + last_exc = None + break + except Exception as fallback_exc: + last_exc = fallback_exc + + if attempt < max_attempts: + time.sleep(backoff_s * (2 ** (attempt - 1))) + continue + break + + if resp is None: + return { + "model": model, + "ok": False, + "error": "llm_failed", + "exc": type(last_exc).__name__ if last_exc else "Unknown", + "msg": str(last_exc)[:5000] if last_exc else "", + } + + if wire_api in ("responses", "response"): + txt = _responses_output_text(resp if isinstance(resp, dict) else {}) + else: + txt = resp.choices[0].message.content or "{}" + try: + obj = json.loads(txt) + except json.JSONDecodeError: + # Return raw text for debugging, but still keep a predictable envelope. + return { + "model": model, + "ok": False, + "error": "invalid_json", + "raw": txt, + } + + if not isinstance(obj, dict): + return { + "model": model, + "ok": False, + "error": "non_object_json", + "raw": txt, + } + + return { + "model": model, + "ok": True, + "event": _coerce_result(obj), + }