示例代码

本文档提供了各种实际使用场景的示例代码。

目录

基础示例

简单初始化

from bohrium import Bohrium

# 使用环境变量
client = Bohrium()

# 或直接传入密钥
client = Bohrium(access_key="your_access_key")

检查连接

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

try:
    # 尝试获取任务列表来验证连接
    jobs = client.job.list(limit=1)
    print("连接成功!")
except Exception as e:
    print(f"连接失败: {e}")

任务管理示例

完整任务生命周期

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

# 1. 创建任务
job = client.job.create(
    # 任务参数
    name="我的任务",
    # ... 其他参数
)
print(f"任务已创建: {job.id}")

# 2. 查询任务状态
status = client.job.retrieve(job_id=job.id)
print(f"任务状态: {status.status}")

# 3. 等待任务完成
import time
while status.status not in ["completed", "failed"]:
    time.sleep(5)
    status = client.job.retrieve(job_id=job.id)
    print(f"当前状态: {status.status}")

# 4. 获取结果
if status.status == "completed":
    print(f"任务完成!结果: {status.result}")
else:
    print(f"任务失败: {status.error}")

# 5. 清理(可选)
# client.job.delete(job_id=job.id)

批量处理任务

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

# 批量创建任务
task_configs = [
    {"name": "任务1", "config": {...}},
    {"name": "任务2", "config": {...}},
    {"name": "任务3", "config": {...}},
]

jobs = []
for config in task_configs:
    job = client.job.create(**config)
    jobs.append(job)
    print(f"已创建任务: {job.id}")

# 批量查询状态
for job in jobs:
    status = client.job.retrieve(job_id=job.id)
    print(f"任务 {job.id} 状态: {status.status}")

搜索示例

基本搜索

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

# 执行搜索
results = client.sigma_search.search(
    query="机器学习算法",
    limit=20
)

for i, result in enumerate(results, 1):
    print(f"{i}. {result.title}")
    print(f"   相关性: {result.score}")
    print()

高级搜索

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

# 带过滤条件的搜索
results = client.sigma_search.search(
    query="深度学习",
    filters={
        "category": "research",
        "year": 2024
    },
    sort_by="relevance",
    limit=50
)

流式搜索

from bohrium import Bohrium
from tqdm import tqdm

client = Bohrium(access_key="your_access_key")

# 流式获取大量搜索结果
all_results = []
with tqdm(desc="搜索中...") as pbar:
    for chunk in client.sigma_search.search_stream(
        query="大规模数据处理",
        limit=1000
    ):
        all_results.extend(chunk.results)
        pbar.update(len(chunk.results))

print(f"共找到 {len(all_results)} 个结果")

解析示例

解析PDF文件

from bohrium import Bohrium

client = Bohrium(access_key="your_access_key")

# 解析PDF
result = client.uni_parser.parse(
    file_path="./document.pdf",
    parser_type="pdf"
)

print(f"文档标题: {result.title}")
print(f"页数: {result.page_count}")
print(f"内容预览:\n{result.content[:500]}")

批量解析

from bohrium import Bohrium
import os

client = Bohrium(access_key="your_access_key")

# 批量解析目录中的所有PDF文件
pdf_dir = "./documents"
results = []

for filename in os.listdir(pdf_dir):
    if filename.endswith(".pdf"):
        filepath = os.path.join(pdf_dir, filename)
        print(f"正在解析: {filename}")

        try:
            result = client.uni_parser.parse(
                file_path=filepath,
                parser_type="pdf"
            )
            results.append({
                "file": filename,
                "title": result.title,
                "pages": result.page_count
            })
        except Exception as e:
            print(f"解析失败 {filename}: {e}")

print(f"\n成功解析 {len(results)} 个文件")

异步示例

并发获取多个任务

import asyncio
from bohrium import AsyncBohrium

async def get_job_status(job_id: str):
    client = AsyncBohrium(access_key="your_access_key")
    job = await client.job.retrieve(job_id=job_id)
    return job.status

async def main():
    job_ids = ["job1", "job2", "job3", "job4", "job5"]

    # 并发获取所有任务状态
    statuses = await asyncio.gather(*[
        get_job_status(job_id) for job_id in job_ids
    ])

    for job_id, status in zip(job_ids, statuses):
        print(f"{job_id}: {status}")

asyncio.run(main())

异步流式搜索

import asyncio
from bohrium import AsyncBohrium

async def async_search():
    client = AsyncBohrium(access_key="your_access_key")

    async for chunk in client.sigma_search.search_stream(
        query="异步处理"
    ):
        print(f"收到 {len(chunk.results)} 个结果")
        for result in chunk.results:
            print(f"  - {result.title}")

asyncio.run(async_search())

错误处理示例

完整错误处理

from bohrium import Bohrium
from bohrium._exceptions import (
    AuthenticationError,
    PermissionDeniedError,
    NotFoundError,
    RateLimitError,
    BadRequestError,
    BohriumError,
)
import time

client = Bohrium(access_key="your_access_key")

def safe_operation(func, max_retries=3):
    """安全执行操作,带自动重试"""
    for attempt in range(max_retries):
        try:
            return func()
        except RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = getattr(e, 'retry_after', 60)
                print(f"遇到限流,等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
            else:
                raise
        except AuthenticationError:
            print("认证失败,请检查API密钥")
            raise
        except PermissionDeniedError:
            print("权限不足")
            raise
        except NotFoundError:
            print("资源不存在")
            raise
        except BadRequestError as e:
            print(f"请求参数错误: {e}")
            raise
        except BohriumError as e:
            print(f"SDK错误: {e}")
            raise
        except Exception as e:
            print(f"未知错误: {e}")
            raise

# 使用示例
try:
    job = safe_operation(
        lambda: client.job.retrieve(job_id="job_id")
    )
    print(f"任务状态: {job.status}")
except Exception as e:
    print(f"操作失败: {e}")

重试装饰器

import time
from functools import wraps
from bohrium._exceptions import RateLimitError

def retry_on_rate_limit(max_retries=3, base_delay=1):
    """限流重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except RateLimitError as e:
                    if attempt < max_retries - 1:
                        delay = base_delay * (2 ** attempt)
                        print(f"限流,等待 {delay} 秒后重试...")
                        time.sleep(delay)
                    else:
                        raise
            return None
        return wrapper
    return decorator

# 使用装饰器
@retry_on_rate_limit(max_retries=5)
def get_job(job_id):
    client = Bohrium(access_key="your_access_key")
    return client.job.retrieve(job_id=job_id)

job = get_job("job_id")

更多示例

您可以在项目的 tests/ 目录中找到更多测试和使用示例。