import logging
import os
import uuid
from pathlib import Path
import humps
# from ..._resource import BaseClient
from pprint import pprint
from typing import Optional
from ..._resource import AsyncAPIResource, SyncAPIResource
from ..._response import APIResponse
from ...types.job.job import JobAddRequest
from ..tiefblue.tiefblue import Tiefblue
log = logging.getLogger(__name__)
USE_SANDBOX = os.getenv("BOHRIUM_USE_SANDBOX") in ["1", "true"]
[文档]
class Job(SyncAPIResource):
[文档]
def create(self, project_id, name='', group_id=0):
data = {
'projectId': project_id
}
if name:
data['name'] = name
if group_id:
data['bohrGroupId'] = group_id
try:
data = self._client.post(f'/openapi/v1/sandbox/job/create' if USE_SANDBOX else f'/openapi/v1/job/create', json=data, params=self._client.params)
data = data.json()
except Exception as e:
raise e
return data.get("data", {})
[文档]
def detail(self, job_id):
log.info(f"detail job {job_id}")
response = self._client.get(f"/openapi/v1/sandbox/job/{job_id}" if USE_SANDBOX else f"/openapi/v1/job/{job_id}")
log.info(response.json())
log.debug(response)
return APIResponse(response).json.get("data")
#return response.json().get("data")
[文档]
def submit(
self,
project_id: int,
job_name: str,
machine_type: str,
cmd: str,
image_address: str,
job_group_id: int = 0,
work_dir: str = "",
result: str = "",
dataset_path: list = [],
log_files: list = [],
out_files: list = [],
):
# log.info(f"submit job {name},project_id:{project_id}")
data = self.create_job(project_id, job_name, job_group_id)
if work_dir != "":
if not os.path.exists(work_dir):
raise FileNotFoundError
if os.path.isdir(work_dir):
self.uploadr(work_dir, data["storePath"], data["token"])
else:
file_name = os.path.basename(work_dir)
object_key = os.path.join(data["storePath"], file_name)
self.upload(work_dir, object_key, data["token"])
ep = os.path.expanduser(result)
p = Path(ep).absolute().resolve()
p = p.joinpath(str(uuid.uuid4()) + "_temp.zip")
job_add_request = JobAddRequest(
download_path=str(p.absolute().resolve()),
dataset_path=dataset_path,
job_name=job_name,
project_id=project_id,
job_id=data["jobId"],
oss_path=data["storePath"],
image_name=image_address,
scass_type=machine_type,
cmd=cmd,
log_files=log_files,
out_files=out_files,
)
return self.insert(job_add_request.to_dict())
[文档]
def insert(self, **kwargs):
camel_data = {humps.camelize(k): v for k, v in kwargs.items()}
if not isinstance(camel_data['ossPath'], list):
camel_data['ossPath'] = [camel_data['ossPath']]
if 'logFile' in camel_data:
camel_data['logFiles'] = camel_data['logFile']
if 'logFiles' in camel_data and not isinstance(camel_data['logFiles'], list):
camel_data['logFiles'] = [camel_data['logFiles']]
response = self._client.post("/openapi/v1/sandbox/job/add" if USE_SANDBOX else "/openapi/v2/job/add", json=camel_data)
log.info(f'[insert] json={camel_data}, response={response.json()}')
return response.json().get("data")
[文档]
def delete(self, job_id):
# log.info(f"delete job {job_id}")
response = self._client.post(f"/openapi/v1/sandbox/job/del/{job_id}" if USE_SANDBOX else f"/openapi/v1/job/del/{job_id}")
[文档]
def terminate(self, job_id):
# log.info(f"terminate job {job_id}")
response = self._client.post(f"/openapi/v1/sandbox/job/terminate/{job_id}" if USE_SANDBOX else f"/openapi/v1/job/terminate/{job_id}")
[文档]
def kill(self, job_id):
# log.info(f"kill job {job_id}")
response = self._client.post(f"/openapi/v1/sandbox/job/kill/{job_id}" if USE_SANDBOX else f"/openapi/v1/job/kill/{job_id}")
[文档]
def log(self, job_id, log_file="STDOUTERR", page=-1, page_size=8192):
# log.info(f"log job {job_id}")
response = self._client.get(
f"/openapi/v1/sandbox/job/{job_id}/log" if USE_SANDBOX else f"/openapi/v1/job/{job_id}/log",
params={"logFile": log_file, "page": page, "pageSize": page_size},
)
return response.json().get("data")["log"]
[文档]
def create_job(
self,
project_id: int,
name: Optional[str] = None,
group_id: Optional[int] = 0,
):
# log.info(f"create job {name}")
# response = self._client.get("/openapi/v1/ak/get")
# data = {
# "userId": response.json().get("data").get("user_id"),
# "projectId": project_id,
# "name": name,
# "bohrGroupId": group_id,
# }
# response = self._client.post("/openapi/v1/job/pre_create", json=data)
# pprint(response.request)
# print(response.json())
# return response.json().get("data")
data = {
"projectId": project_id,
"name": name,
"bohrGroupId": group_id,
}
response = self._client.post(f"/openapi/v1/sandbox/job/create" if USE_SANDBOX else f"/openapi/v1/job/create", json=data)
return response.json().get("data")
[文档]
def create_job_group(self, project_id, job_group_name):
# log.info(f"create job group {job_group_name}")
response = self._client.post(
"/openapi/v1/job_group/add",
json={"name": job_group_name, "projectId": project_id},
)
return response.json().get("data")
[文档]
def upload(
self,
file_path: str,
object_key: str,
token: str,
):
tiefblue = Tiefblue()
tiefblue.upload_From_file_multi_part(
object_key=object_key, file_path=file_path, progress_bar=True
)
[文档]
def uploadr(self, work_dir, store_path, token):
if not work_dir.endswith("/"):
work_dir = work_dir + "/"
for root, _, files in os.walk(work_dir):
for file in files:
full_path = os.path.join(root, file)
object_key = full_path.replace(work_dir, store_path)
self.upload(full_path, object_key, token)
[文档]
def download(self, job_id, save_path):
detail = self.detail(job_id)
tiefblue = Tiefblue()
tiefblue.download_from_url(detail["resultUrl"], save_path)
[文档]
class AsyncJob(AsyncAPIResource):
pass