Files

340 lines
11 KiB
Python
Raw Permalink Normal View History

import json
import os
import subprocess
import time
import urllib.error
import urllib.request
from typing import Any
from openai import OpenAI
def _client() -> OpenAI:
base_url = os.environ.get("LLM_BASE_URL")
api_key = os.environ.get("LLM_API_KEY")
if not api_key:
raise RuntimeError("LLM_API_KEY is not set")
# Keep LLM calls bounded; some gateways hang on upstream issues.
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
# OpenAI SDK uses base_url for OpenAI-compatible gateways.
return OpenAI(api_key=api_key, base_url=base_url, timeout=timeout_s)
SYSTEM_PROMPT = (
"You are a finance event extraction engine. Extract a single structured event. "
"Be conservative: if uncertain, set impact_direction=0 and confidence low. "
"Output MUST be valid JSON only (no markdown, no commentary)."
)
def _coerce_result(obj: dict[str, Any]) -> dict[str, Any]:
# Keep the contract stable for downstream DB writes.
out: dict[str, Any] = {}
out["event_type"] = str(obj.get("event_type") or "unknown")
region = obj.get("region")
if region is not None and str(region).strip() != "":
out["region"] = str(region)
entities = obj.get("entities")
if isinstance(entities, list):
out["entities"] = [str(x) for x in entities if str(x).strip()]
else:
out["entities"] = []
out["summary_zh"] = str(obj.get("summary_zh") or "")
out["summary_en"] = str(obj.get("summary_en") or "")
# -1/0/1 only
try:
impact_direction = int(obj.get("impact_direction"))
except Exception:
impact_direction = 0
if impact_direction not in (-1, 0, 1):
impact_direction = 0
out["impact_direction"] = impact_direction
# 0..1
try:
confidence = float(obj.get("confidence"))
except Exception:
confidence = 0.0
if confidence < 0.0:
confidence = 0.0
if confidence > 1.0:
confidence = 1.0
out["confidence"] = confidence
evidence = obj.get("evidence")
if isinstance(evidence, list):
out["evidence"] = [str(x) for x in evidence if str(x).strip()]
else:
out["evidence"] = []
return out
def _responses_output_text(resp_obj: dict[str, Any]) -> str:
txt = resp_obj.get("output_text")
if isinstance(txt, str) and txt.strip():
return txt
# Fallback for gateways that only return structured `output` blocks.
out = resp_obj.get("output")
if isinstance(out, list):
chunks: list[str] = []
for item in out:
if not isinstance(item, dict):
continue
content = item.get("content")
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in ("output_text", "text"):
val = block.get("text")
if isinstance(val, str) and val.strip():
chunks.append(val)
if chunks:
return "\n".join(chunks)
return "{}"
def _find_json_object(text: str) -> str | None:
start = text.find("{")
while start != -1:
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start : i + 1]
try:
json.loads(candidate)
return candidate
except Exception:
break
start = text.find("{", start + 1)
return None
def _responses_create_http(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
base_url = (os.environ.get("LLM_BASE_URL") or "").rstrip("/")
api_key = os.environ.get("LLM_API_KEY")
if not base_url:
raise RuntimeError("LLM_BASE_URL is not set")
if not api_key:
raise RuntimeError("LLM_API_KEY is not set")
timeout_s = float(os.environ.get("LLM_TIMEOUT_S", "30"))
body = {
"model": model,
"stream": False,
"max_output_tokens": int(os.environ.get("LLM_MAX_OUTPUT_TOKENS", "512")),
"input": [
{
"role": "user",
"content": [
{
"type": "text",
"text": (
SYSTEM_PROMPT
+ "\n\nReturn valid JSON object only.\n\nInput:\n"
+ json.dumps(user_payload, ensure_ascii=True)
),
}
],
}
],
}
req = urllib.request.Request(
url=f"{base_url}/responses",
data=json.dumps(body).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"http_{e.code}: {detail[:5000]}") from e
try:
return json.loads(raw)
except json.JSONDecodeError as e:
raise RuntimeError(f"invalid_gateway_json: {raw[:1000]}") from e
def _codex_exec_fallback(model: str, user_payload: dict[str, Any]) -> dict[str, Any]:
codex_bin = os.environ.get("CODEX_BIN", os.path.expanduser("~/.local/npm-global/bin/codex"))
codex_timeout_s = float(os.environ.get("CODEX_FALLBACK_TIMEOUT_S", "90"))
codex_cwd = os.environ.get("CODEX_FALLBACK_CWD", "/home/openclaw")
prompt = (
"Return ONLY one valid JSON object with keys: "
"event_type,region,entities,summary_zh,summary_en,impact_direction,confidence,evidence.\n"
"No markdown, no extra text.\n"
f"Input:\n{json.dumps(user_payload, ensure_ascii=True)}"
)
proc = subprocess.run(
[
codex_bin,
"exec",
"-m",
model,
"-s",
"read-only",
"-C",
codex_cwd,
"--skip-git-repo-check",
prompt,
],
capture_output=True,
text=True,
timeout=codex_timeout_s,
check=False,
)
combined = (proc.stdout or "") + "\n" + (proc.stderr or "")
candidate = _find_json_object(combined)
if not candidate:
raise RuntimeError(f"codex_no_json: {combined[:1000]}")
return {"output_text": candidate}
def extract_event(title: str, content: str, lang_hint: str | None = None) -> dict[str, Any]:
"""Return a strict JSON object matching our event schema.
Supports both OpenAI-compatible APIs:
- chat.completions (response_format=json_object)
- responses (text.format=json_object)
"""
model = os.environ.get("LLM_MODEL", "gpt-5.2")
wire_api = (os.environ.get("LLM_WIRE_API") or os.environ.get("WIRE_API") or "chat").strip().lower()
user = {
"title": title,
"content": content,
"lang_hint": lang_hint,
"required_output": {
"event_type": "string (macro/policy/war/sanctions/earnings/commodity/supply_shock/rate/inflation/...) ",
"region": "string optional (CN/US/EU/JP/Global/...)",
"entities": "array of strings (companies/countries/commodities/indices)",
"summary_zh": "string",
"summary_en": "string",
"impact_direction": "-1|0|1 (market risk sentiment for affected assets)",
"confidence": "float 0..1",
"evidence": "array of strings (short quotes or key facts; URLs if present)",
},
}
c = _client() if wire_api not in ("responses", "response") else None
max_attempts = int(os.environ.get("LLM_MAX_ATTEMPTS", "3"))
backoff_s = float(os.environ.get("LLM_RETRY_BACKOFF_S", "1.0"))
last_exc: Exception | None = None
resp = None
for attempt in range(1, max_attempts + 1):
try:
if wire_api in ("responses", "response"):
# CCH is more reliable with direct /v1/responses HTTP + stream=false.
resp = _responses_create_http(model=model, user_payload=user)
else:
resp = c.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(user, ensure_ascii=True)},
],
temperature=0.2,
response_format={"type": "json_object"},
)
last_exc = None
break
except Exception as e:
# Gateways can intermittently throw 5xx/connection errors. Retry a few times.
last_exc = e
# Optional escape hatch: fallback to local Codex CLI when CCH responses are unstable.
use_codex_fallback = os.environ.get("LLM_FALLBACK_CODEX", "1").lower() in ("1", "true", "yes")
if use_codex_fallback and wire_api in ("responses", "response"):
try:
resp = _codex_exec_fallback(model=model, user_payload=user)
last_exc = None
break
except Exception as fallback_exc:
last_exc = fallback_exc
if attempt < max_attempts:
time.sleep(backoff_s * (2 ** (attempt - 1)))
continue
break
if resp is None:
return {
"model": model,
"ok": False,
"error": "llm_failed",
"exc": type(last_exc).__name__ if last_exc else "Unknown",
"msg": str(last_exc)[:5000] if last_exc else "",
}
if wire_api in ("responses", "response"):
txt = _responses_output_text(resp if isinstance(resp, dict) else {})
else:
txt = resp.choices[0].message.content or "{}"
try:
obj = json.loads(txt)
except json.JSONDecodeError:
# Return raw text for debugging, but still keep a predictable envelope.
return {
"model": model,
"ok": False,
"error": "invalid_json",
"raw": txt,
}
if not isinstance(obj, dict):
return {
"model": model,
"ok": False,
"error": "non_object_json",
"raw": txt,
}
return {
"model": model,
"ok": True,
"event": _coerce_result(obj),
}