bohrium._client 源代码

from __future__ import annotations

import os
from typing import Mapping, Optional, Union

from httpx import URL, Client, Response, Timeout
from typing_extensions import override

from . import _exceptions, resources
from ._base_client import DEFAULT_MAX_RETRIES, AsyncAPIClient, SyncAPIClient
from ._exceptions import APIStatusError, BohriumError
from ._version import __version__


[文档] class Bohrium(SyncAPIClient): job: resources.Job sigma_search: resources.SigmaSearch uni_parser: resources.UniParser knowledge_base: resources.KnowledgeBase paper: resources.Paper # client options access_key: str project_id: Optional[str] def __init__( self, access_key: Optional[str] = None, app_key: Optional[str] = None, base_url: Optional[Union[str, URL]] = None, project_id: Optional[str] = None, timeout: Optional[Union[float, Timeout]] = 30.0, max_retries: Optional[int] = DEFAULT_MAX_RETRIES, http_client: Optional[Client] = None, ) -> None: """Construct a new synchronous openai client instance.""" if access_key is None: access_key = os.environ.get("BOHRIUM_ACCESS_KEY") if access_key is None: raise BohriumError( "The api_key client option must be set either by passing api_key to the client or by setting the ACCESS_KEY environment variable" ) self.access_key = access_key self.app_key = app_key or os.environ.get("BOHRIUM_APP_KEY") self.params = {"accessKey": self.access_key} if project_id is None: project_id = os.environ.get("BOHRIUM_PROJECT_ID") self.project_id = project_id if base_url is None: base_url = os.environ.get("BOHRIUM_BASE_URL") if base_url is None: base_url = "https://openapi.dp.tech" super().__init__( _version=__version__, base_url=base_url, max_retries=max_retries, timeout=timeout, http_client=http_client, custom_headers=self.default_headers, ) self.job = resources.Job(self) self.sigma_search = resources.SigmaSearch(self) self.uni_parser = resources.UniParser(self) self.knowledge_base = resources.KnowledgeBase(self) self.paper = resources.Paper(self) @property @override def default_headers(self) -> dict[str, str]: return { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.access_key}", "x-app-key": self.app_key, } def _make_status_error( self, err_msg: str, *, body: object, response: Response, ) -> APIStatusError: data = body.get("error", body) if isinstance(body, Mapping) else body if response.status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=data) if response.status_code == 401: return _exceptions.AuthenticationError( err_msg, response=response, body=data ) if response.status_code == 403: return _exceptions.PermissionDeniedError( err_msg, response=response, body=data ) if response.status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=data) if response.status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=data) if response.status_code == 422: return _exceptions.UnprocessableEntityError( err_msg, response=response, body=data ) if response.status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=data) if response.status_code >= 500: return _exceptions.InternalServerError( err_msg, response=response, body=data ) return APIStatusError(err_msg, response=response, body=data)
[文档] class AsyncBohrium(AsyncAPIClient): pass