Source code for ionworks.job

"""Job client for managing asynchronous jobs.

This module provides the :class:`JobClient` for submitting, monitoring, and
managing background jobs in the Ionworks platform.
"""

from typing import Any

from pydantic import BaseModel, ValidationError


[docs] class JobCreationPayload(BaseModel): """Payload for creating a job.""" job_type: str params: dict[str, Any] priority: int = 5 # Default priority callback_url: str | None = None # Optional callback
[docs] class JobResponse(BaseModel): """Response model for job details.""" job_id: str status: str job_type: str params: dict[str, Any] priority: int created_at: str updated_at: str metadata: dict[str, Any] | None error: str | None result: dict[str, Any] | None
[docs] class JobClient: """Client for managing asynchronous jobs. This class provides methods to create, retrieve, list, and cancel jobs in the Ionworks platform. """
[docs] def __init__(self, client: Any) -> None: """Initialize the JobClient. Parameters ---------- client : Any The HTTP client instance used for API requests. """ self.client = client
[docs] def create(self, payload: JobCreationPayload) -> JobResponse: """Submit a job using the provided payload. Parameters ---------- payload : JobCreationPayload The configuration for the job to be created. Returns ------- JobResponse Response containing the job_id and initial status. Raises ------ requests.exceptions.RequestException If the API request fails. ValueError If the response parsing fails. """ endpoint = "/jobs/" try: response_data = self.client.post( endpoint, json_payload=payload.model_dump(exclude_none=True) ) # Pydantic validation is applied here return JobResponse(**response_data) except ValidationError as e: # Catch Pydantic validation errors specifically msg = f"Invalid response format received from {endpoint}: {e}" raise ValueError(msg) from e
# RequestExceptions (including HTTPError) are handled by client._post
[docs] def get(self, job_id: str) -> JobResponse: """Get the status and details of a specific job. Parameters ---------- job_id : str The ID of the job to retrieve. Returns ------- JobResponse Job details and current status. Raises ------ ValueError If the response parsing fails. """ endpoint = f"/jobs/{job_id}" try: response_data = self.client.get(endpoint) return JobResponse(**response_data) except ValidationError as e: msg = f"Invalid response format received from {endpoint}: {e}" raise ValueError(msg) from e
[docs] def list(self) -> list[JobResponse]: """List all jobs. Returns ------- list[JobResponse] List of all jobs with their details. Raises ------ ValueError If the response is not a list or job data format is invalid. """ endpoint = "/jobs/" response_data = self.client.get(endpoint) # Ensure response_data is a list before list comprehension if not isinstance(response_data, list): msg = ( f"Unexpected response format from {endpoint}: expected a list, " f"got {type(response_data).__name__}" ) raise ValueError(msg) # Apply validation within list comprehension try: return [JobResponse(**job) for job in response_data] except ValidationError as e: msg = f"Invalid job data format received from {endpoint}: {e}" raise ValueError(msg) from e
[docs] def cancel(self, job_id: str) -> JobResponse: """Cancel a job. Parameters ---------- job_id : str The ID of the job to cancel. Returns ------- JobResponse Updated job details with canceled status. Raises ------ ValueError If the response parsing fails. """ endpoint = f"/jobs/{job_id}/cancel" try: response_data = self.client.post(endpoint, json_payload={}) return JobResponse(**response_data) except ValidationError as e: msg = f"Invalid response format received from {endpoint}: {e}" raise ValueError(msg) from e