Files
quant-factor-research/scripts/expand_etf_universe.py

149 lines
4.5 KiB
Python
Raw Normal View History

2026-03-13 17:10:49 +08:00
from __future__ import annotations
import argparse
import json
import math
from collections import defaultdict
from datetime import date, timedelta
from pathlib import Path
import pandas as pd
from qfr.data.tushare_client import load_tushare_config, pro_api
def median_amount(cfg, ts_code: str, start: str, end: str) -> float:
api = pro_api(cfg)
df = api.fund_daily(ts_code=ts_code, start_date=start, end_date=end, fields="trade_date,amount")
if df is None or df.empty or "amount" not in df.columns:
return 0.0
amt = pd.to_numeric(df["amount"], errors="coerce").dropna()
if amt.empty:
return 0.0
return float(amt.median())
def classify_by_keyword(kw: str) -> str:
# very rough tagging for universe constraints / reporting
equity_kws = {
"半导体",
"芯片",
"通信",
"5G",
"通信设备",
"军工",
"机器人",
"工业母机",
"智能制造",
"消费电子",
"AI",
"算力",
"软件",
"创新药",
"医药",
"新能源",
"光伏",
"锂电",
"电池",
"新材料",
"稀土",
}
commodity_kws = {"黄金", "白银", "有色", "稀土", "矿业", "原油", "", "", "化工", "豆粕", "农业"}
rates_kws = {"国债", "政金债", "", "短债", "中债"}
if kw in rates_kws:
return "rates_cn"
if kw in commodity_kws:
return "commodity_cn"
if kw in equity_kws:
return "equity_cn_sector"
return "equity_cn_sector"
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--config", default="configs/etf_universe.json")
ap.add_argument("--out", default=None)
ap.add_argument("--per_keyword", type=int, default=2)
ap.add_argument("--min_median_amount", type=float, default=0.0)
ap.add_argument(
"--keywords",
default=(
"半导体,芯片,通信,5G,通信设备,军工,机器人,工业母机,智能制造,消费电子,AI,算力,软件,创新药,医药,新能源,光伏,锂电,电池,"
"矿业,有色,稀土,新材料,黄金,白银,原油,煤,化工,豆粕,农业,国债,政金债"
),
)
args = ap.parse_args()
cfg = load_tushare_config()
api = pro_api(cfg)
conf_path = Path(args.config)
conf = json.loads(conf_path.read_text(encoding="utf-8"))
assets = conf.get("assets", [])
have = {a["ts_code"] for a in assets}
kw_list = [k.strip() for k in str(args.keywords).split(",") if k.strip()]
fb = api.fund_basic(market="E", status="L", fields="ts_code,name")
if fb is None or fb.empty:
raise RuntimeError("fund_basic returned empty")
fb = fb.dropna(subset=["ts_code", "name"]).copy()
end = date.today().strftime("%Y%m%d")
start = (date.today() - timedelta(days=180)).strftime("%Y%m%d")
buckets: dict[str, list[tuple[str, str]]] = defaultdict(list)
for _, r in fb.iterrows():
ts_code = str(r["ts_code"]).strip()
name = str(r["name"]).strip()
for kw in kw_list:
if kw in name:
buckets[kw].append((ts_code, name))
break
chosen: list[tuple[str, str, str, float, str]] = []
for kw in kw_list:
cands = buckets.get(kw, [])
if not cands:
continue
scored: list[tuple[float, str, str]] = []
for ts_code, name in cands:
if ts_code in have:
continue
try:
m = median_amount(cfg, ts_code, start, end)
except Exception:
m = 0.0
if not math.isfinite(m) or m <= 0:
continue
if m < float(args.min_median_amount):
continue
scored.append((m, ts_code, name))
scored.sort(reverse=True)
for m, ts_code, name in scored[: int(args.per_keyword)]:
cls = classify_by_keyword(kw)
chosen.append((kw, ts_code, name, m, cls))
for kw, ts_code, name, m, cls in chosen:
assets.append({"ts_code": ts_code, "asset_class": cls, "name": name})
have.add(ts_code)
conf["assets"] = assets
out_path = Path(args.out) if args.out else conf_path
out_path.write_text(json.dumps(conf, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
print(f"added {len(chosen)} ETFs")
for kw, ts_code, name, m, cls in chosen[:80]:
print(f"{kw}\t{ts_code}\t{m:.0f}\t{cls}\t{name}")
if __name__ == "__main__":
main()