HTTP 请求
任何获取数据的函数都需要发出出站 HTTP 请求。openbb-core
中的实用函数简化了同步和异步请求的过程。这些函数涵盖了大多数典型请求,应该导入使用而不是从头创建新的客户端。
使用这些辅助函数将通过消除重复流程来保持代码库更精简和易于维护。任何人都可以构建有效且高效的数据获取器,本指南概述了如何导入和实现任一类型的请求到任何获取器中。
生成查询字符串
要将参数传递给 URL,需要将它们格式化为查询字符串。辅助函数 get_querystring()
将参数字典转换为标准查询 URL 字符串。
from openbb_core.provider.utils.helpers import get_querystring
参数
----------
items: dict
要转换为查询字符串的字典。
exclude: List[str]
要从查询字符串中排除的键。
返回
-------
str
查询字符串。
在 Fetcher 的上下文中,"query" 对象是一个 Pydantic 模型。要将查询参数传递给辅助函数,对查询对象应用 model_dump()
。这会移除任何值为 None
的键值对。
可能有一些参数不打算包含在 URL 字符串的参数部分中。将这些参数作为 List
传递给 get_querystring()
的 exclude
参数。
query_string = get_querystring(query.model_dump(), ["interval"])
同步请求
对于同步 HTTP 请求,使用 make_request
函数:
from openbb_core.provider.utils.helpers import make_request
基本用法
# 简单 GET 请求
response = make_request(url="https://api.example.com/data")
# 带参数的 GET 请求
params = {"symbol": "AAPL", "period": "1d"}
response = make_request(
url="https://api.example.com/data",
params=params
)
# POST 请求
data = {"key": "value"}
response = make_request(
url="https://api.example.com/data",
method="POST",
data=data
)
错误处理
try:
response = make_request(url="https://api.example.com/data")
if response.status_code == 200:
data = response.json()
else:
raise Exception(f"请求失败: {response.status_code}")
except Exception as e:
print(f"HTTP 请求错误: {e}")
异步请求
对于异步 HTTP 请求,使用 amake_request
函数:
from openbb_core.provider.utils.helpers import amake_request
import asyncio
基本用法
async def fetch_data():
# 异步 GET 请求
response = await amake_request(url="https://api.example.com/data")
return response.json()
# 运行异步函数
data = asyncio.run(fetch_data())
并发请求
async def fetch_multiple_symbols(symbols):
"""
并发获取多个股票代码的数据
"""
tasks = []
for symbol in symbols:
url = f"https://api.example.com/data?symbol={symbol}"
task = amake_request(url=url)
tasks.append(task)
# 并发执行所有请求
responses = await asyncio.gather(*tasks)
# 处理响应
results = []
for response in responses:
if response.status_code == 200:
results.append(response.json())
return results
# 使用示例
symbols = ["AAPL", "MSFT", "GOOGL"]
data = asyncio.run(fetch_multiple_symbols(symbols))
请求配置
自定义头部
headers = {
"Authorization": "Bearer your_token",
"Content-Type": "application/json",
"User-Agent": "OpenBB/1.0"
}
response = make_request(
url="https://api.example.com/data",
headers=headers
)
超时设置
# 设置请求超时
response = make_request(
url="https://api.example.com/data",
timeout=30 # 30秒超时
)
重试机制
import time
from typing import Optional
def make_request_with_retry(
url: str,
max_retries: int = 3,
delay: float = 1.0,
**kwargs
) -> Optional[dict]:
"""
带重试机制的请求函数
"""
for attempt in range(max_retries):
try:
response = make_request(url=url, **kwargs)
if response.status_code == 200:
return response.json()
elif response.status_code == 429: # 速率限制
wait_time = delay * (2 ** attempt)
print(f"速率限制,等待 {wait_time} 秒...")
time.sleep(wait_time)
continue
else:
print(f"请求失败,状态码: {response.status_code}")
except Exception as e:
print(f"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(delay)
return None
最佳实践
1. 错误处理
def safe_request(url: str, **kwargs) -> Optional[dict]:
"""
安全的 HTTP 请求函数
"""
try:
response = make_request(url=url, **kwargs)
# 检查状态码
if response.status_code != 200:
print(f"HTTP 错误: {response.status_code}")
return None
# 尝试解析 JSON
try:
return response.json()
except ValueError:
print("响应不是有效的 JSON")
return None
except Exception as e:
print(f"请求失败: {e}")
return None
2. 数据验证
def validate_response_data(data: dict, required_fields: list) -> bool:
"""
验证响应数据
"""
if not isinstance(data, dict):
return False
for field in required_fields:
if field not in data:
print(f"缺少必需字段: {field}")
return False
return True
# 使用示例
response_data = make_request("https://api.example.com/data").json()
if validate_response_data(response_data, ["symbol", "price", "timestamp"]):
# 处理有效数据
pass
3. 缓存机制
import time
from typing import Dict, Any
class SimpleCache:
def __init__(self, ttl: int = 300): # 5分钟缓存
self.cache: Dict[str, Dict[str, Any]] = {}
self.ttl = ttl
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
if time.time() - self.cache[key]['timestamp'] < self.ttl:
return self.cache[key]['data']
else:
del self.cache[key]
return None
def set(self, key: str, data: Any):
self.cache[key] = {
'data': data,
'timestamp': time.time()
}
# 全局缓存实例
request_cache = SimpleCache()
def cached_request(url: str, **kwargs) -> Optional[dict]:
"""
带缓存的请求函数
"""
cache_key = f"{url}_{hash(str(kwargs))}"
# 检查缓存
cached_data = request_cache.get(cache_key)
if cached_data:
return cached_data
# 发起请求
response = make_request(url=url, **kwargs)
if response.status_code == 200:
data = response.json()
request_cache.set(cache_key, data)
return data
return None
在 Fetcher 中的实现
from openbb_core.provider.abstract.fetcher import Fetcher
from openbb_core.provider.utils.helpers import make_request, get_querystring
class MyDataFetcher(Fetcher):
"""
自定义数据获取器示例
"""
@staticmethod
def transform_query(params: dict) -> dict:
"""转换查询参数"""
return params
@staticmethod
def extract_data(query: dict, credentials: dict, **kwargs) -> dict:
"""提取数据"""
# 构建 URL
base_url = "https://api.example.com/data"
# 生成查询字符串
query_string = get_querystring(
query,
exclude=["internal_param"]
)
url = f"{base_url}?{query_string}"
# 设置请求头
headers = {
"Authorization": f"Bearer {credentials.get('api_key')}",
"Content-Type": "application/json"
}
# 发起请求
response = make_request(url=url, headers=headers)
if response.status_code != 200:
raise Exception(f"API 请求失败: {response.status_code}")
return response.json()
@staticmethod
def transform_data(query: dict, data: dict, **kwargs) -> list:
"""转换数据格式"""
# 处理和转换数据
return data.get('results', [])
这些辅助函数和最佳实践将帮助您在 OpenBB 平台中构建可靠和高效的数据获取器。