Source code for ionworks.job
"""Job client for managing asynchronous jobs.
This module provides the :class:`JobClient` for submitting, monitoring, and
managing background jobs in the Ionworks platform.
"""
from typing import Any
from pydantic import BaseModel, ValidationError
[docs]
class JobCreationPayload(BaseModel):
"""Payload for creating a job."""
job_type: str
params: dict[str, Any]
priority: int = 5 # Default priority
callback_url: str | None = None # Optional callback
[docs]
class JobResponse(BaseModel):
"""Response model for job details."""
job_id: str
status: str
job_type: str
params: dict[str, Any]
priority: int
created_at: str
updated_at: str
metadata: dict[str, Any] | None
error: str | None
result: dict[str, Any] | None
[docs]
class JobClient:
"""Client for managing asynchronous jobs.
This class provides methods to create, retrieve, list, and cancel jobs
in the Ionworks platform.
"""
[docs]
def __init__(self, client: Any) -> None:
"""Initialize the JobClient.
Parameters
----------
client : Any
The HTTP client instance used for API requests.
"""
self.client = client
[docs]
def create(self, payload: JobCreationPayload) -> JobResponse:
"""Submit a job using the provided payload.
Parameters
----------
payload : JobCreationPayload
The configuration for the job to be created.
Returns
-------
JobResponse
Response containing the job_id and initial status.
Raises
------
requests.exceptions.RequestException
If the API request fails.
ValueError
If the response parsing fails.
"""
endpoint = "/jobs/"
try:
response_data = self.client.post(
endpoint, json_payload=payload.model_dump(exclude_none=True)
)
# Pydantic validation is applied here
return JobResponse(**response_data)
except ValidationError as e:
# Catch Pydantic validation errors specifically
msg = f"Invalid response format received from {endpoint}: {e}"
raise ValueError(msg) from e
# RequestExceptions (including HTTPError) are handled by client._post
[docs]
def get(self, job_id: str) -> JobResponse:
"""Get the status and details of a specific job.
Parameters
----------
job_id : str
The ID of the job to retrieve.
Returns
-------
JobResponse
Job details and current status.
Raises
------
ValueError
If the response parsing fails.
"""
endpoint = f"/jobs/{job_id}"
try:
response_data = self.client.get(endpoint)
return JobResponse(**response_data)
except ValidationError as e:
msg = f"Invalid response format received from {endpoint}: {e}"
raise ValueError(msg) from e
[docs]
def list(self) -> list[JobResponse]:
"""List all jobs.
Returns
-------
list[JobResponse]
List of all jobs with their details.
Raises
------
ValueError
If the response is not a list or job data format is invalid.
"""
endpoint = "/jobs/"
response_data = self.client.get(endpoint)
# Ensure response_data is a list before list comprehension
if not isinstance(response_data, list):
msg = (
f"Unexpected response format from {endpoint}: expected a list, "
f"got {type(response_data).__name__}"
)
raise ValueError(msg)
# Apply validation within list comprehension
try:
return [JobResponse(**job) for job in response_data]
except ValidationError as e:
msg = f"Invalid job data format received from {endpoint}: {e}"
raise ValueError(msg) from e
[docs]
def cancel(self, job_id: str) -> JobResponse:
"""Cancel a job.
Parameters
----------
job_id : str
The ID of the job to cancel.
Returns
-------
JobResponse
Updated job details with canceled status.
Raises
------
ValueError
If the response parsing fails.
"""
endpoint = f"/jobs/{job_id}/cancel"
try:
response_data = self.client.post(endpoint, json_payload={})
return JobResponse(**response_data)
except ValidationError as e:
msg = f"Invalid response format received from {endpoint}: {e}"
raise ValueError(msg) from e