大约 3 分钟
import json
import re
import time
from os import environ
from os.path import exists, expanduser
import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
# ---------------------------------------------------------------------------
# 工具函数
# ---------------------------------------------------------------------------
def load_credentials(credentials_path='brain_credentials.txt') -> tuple:
"""从凭证文件或环境变量中加载用户名和密码。
函数支持多种格式:
- JSON 列表: ["username", "password"]
- JSON 字典: {"BRAIN_USERNAME": "username", "BRAIN_PASSWORD": "password"}
- 纯文本: username password 或 username,password
- 键值形式: BRAIN_USERNAME:'username', BRAIN_PASSWORD:'password'
- 环境变量: BRAIN_USERNAME 和 BRAIN_PASSWORD
"""
path = expanduser(credentials_path)
credentials = None
if exists(path):
with open(path, 'r', encoding='utf-8') as f:
body = f.read().strip()
if body:
try:
parsed = json.loads(body)
if isinstance(parsed, (list, tuple)) and len(parsed) == 2:
credentials = tuple(parsed)
elif isinstance(parsed, dict):
if 'BRAIN_USERNAME' in parsed and 'BRAIN_PASSWORD' in parsed:
credentials = (parsed['BRAIN_USERNAME'], parsed['BRAIN_PASSWORD'])
except json.JSONDecodeError:
# 支持 BRAIN_USERNAME:'user', BRAIN_PASSWORD:'pass' 这种键值格式
username_match = re.search(
r"BRAIN_USERNAME\s*[:=]\s*['\"]([^'\"]+)['\"]",
body,
)
password_match = re.search(
r"BRAIN_PASSWORD\s*[:=]\s*['\"]([^'\"]+)['\"]",
body,
)
if username_match and password_match:
credentials = (username_match.group(1), password_match.group(1))
else:
# fallback: split the file contents on whitespace, comma, or semicolon
parts = re.split(r'[\s,;]+', re.sub(r'[{}\[\]"]', '', body))
if len(parts) == 2:
credentials = tuple(parts)
if not credentials:
# 如果文件加载失败,则使用环境变量作为备选
credentials = (environ.get('BRAIN_USERNAME'), environ.get('BRAIN_PASSWORD'))
if not credentials or len(credentials) != 2 or not all(credentials):
raise ValueError(
'请提供有效的凭证,支持 JSON 列表 [username, password],\n'
'或通过空白符/逗号分隔的纯文本,\n'
'或 JSON 字典 BRAIN_USERNAME/BRAIN_PASSWORD,\n'
'或环境变量 BRAIN_USERNAME 和 BRAIN_PASSWORD。'
)
return credentials
def create_session(username: str, password: str) -> requests.Session:
"""创建 requests 会话并使用 Brain API 进行身份验证。"""
sess = requests.Session()
sess.auth = HTTPBasicAuth(username, password)
response = sess.post('https://api.worldquantbrain.com/authentication')
response.raise_for_status()
print('Authenticated with status code:', response.status_code)
return sess
# ---------------------------------------------------------------------------
# 数据检索辅助函数
# ---------------------------------------------------------------------------
def get_datafields(
sess: requests.Session,
instrument_type: str = 'EQUITY',
region: str = 'USA',
delay: int = 1,
universe: str = 'TOP3000',
dataset_id: str = '',
data_type: str = 'MATRIX',
search: str = '',
) -> pd.DataFrame:
"""从 Brain API 分页获取数据字段元数据。"""
offset = 0
datafields_list = []
while True:
# 使用当前偏移量和查询参数构建请求 URL
url = (
'https://api.worldquantbrain.com/data-fields?'
f'instrumentType={instrument_type}'
f'®ion={region}&delay={delay}&universe={universe}'
f'&dataset.id={dataset_id}&limit=50&offset={offset}'
f'&type={data_type}'
)
if search:
url += f'&search={search}'
resp = sess.get(url)
resp.raise_for_status()
results = resp.json()
if 'results' not in results:
print(f'Unexpected response: {results}')
break
batch = results['results']
print(f'Fetched {len(batch)} data fields with offset {offset}.')
datafields_list.extend(batch)
# 当当前批次不足 50 条时,说明已到最后一页
if len(batch) < 50:
print('Fetched the last batch of data fields.')
break
offset += 50
time.sleep(5)
return pd.DataFrame(datafields_list)
# ---------------------------------------------------------------------------
# 仿真负载生成
# ---------------------------------------------------------------------------
def build_simulation_payloads(datafields_list: list) -> list:
"""根据数据字段 ID 创建仿真负载列表。"""
alpha_list = []
group_ops_list = ['group_neutralize']
ts_ops_list = ['ts_mean', 'ts_rank']
days = [63, 126]
groups = ['market', 'sector', 'industry']
for datafield in datafields_list:
for group_ops in group_ops_list:
for ts_ops in ts_ops_list:
for day in days:
for group in groups:
expr = f'{group_ops}({ts_ops}({datafield}, {day}), {group})'
print('正在将如下 Alpha 表达式与 setting 封装')
print(expr)
simulation_data = {
'type': 'REGULAR',
'settings': {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 0,
'neutralization': 'MARKET',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'ON',
'language': 'FASTEXPR',
'visualization': False,
},
'regular': expr,
}
alpha_list.append(simulation_data)
return alpha_list
# ---------------------------------------------------------------------------
# 仿真执行
# ---------------------------------------------------------------------------
def run_simulations(sess: requests.Session, alpha_list: list) -> None:
"""提交每个 alpha 负载并轮询任务直到完成。"""
for alpha in alpha_list:
sim_resp = sess.post('https://api.worldquantbrain.com/simulations', json=alpha)
if not sim_resp.ok:
print('Simulation request failed:', sim_resp.status_code, sim_resp.text)
continue
sim_progress_url = sim_resp.headers.get('Location')
if not sim_progress_url:
print('Simulation response missing Location header:', sim_resp.text)
time.sleep(10)
continue
while True:
sim_progress_resp = sess.get(sim_progress_url)
sim_progress_resp.raise_for_status()
retry_after_sec = float(sim_progress_resp.headers.get('Retry-After', 0))
if retry_after_sec == 0:
break
time.sleep(retry_after_sec)
alpha_id = sim_progress_resp.json().get('alpha')
print('Alpha id:', alpha_id)
# ---------------------------------------------------------------------------
# 主程序入口
# ---------------------------------------------------------------------------
def main() -> None:
"""加载凭证,获取数据字段,构建负载,并执行仿真。"""
username, password = load_credentials()
sess = create_session(username, password)
fundamental6 = get_datafields(sess, dataset_id='pv13', data_type='MATRIX')
datafields_list = fundamental6['id'].tolist()
print('Fetched data field count:', len(datafields_list))
alpha_list = build_simulation_payloads(datafields_list)
run_simulations(sess, alpha_list)
if __name__ == '__main__':
main()