Source code for ionworks.client

"""Main client module for the Ionworks API.

This module provides the :class:`Ionworks` client, which is the main entry point
for interacting with the Ionworks API. It handles authentication, request/response
processing, and provides access to all API resources through sub-clients.
"""

import gzip
import json as json_mod
import os
from typing import Any, cast

from dotenv import load_dotenv
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from .cell_instance import CellInstanceClient
from .cell_measurement import CellMeasurementClient
from .cell_specification import CellSpecificationClient
from .errors import IonworksError
from .job import JobClient
from .pipeline import PipelineClient
from .simulation import SimulationClient
from .validators import set_dataframe_backend

#: Payloads larger than this (bytes) are gzip-compressed before sending.
_GZIP_THRESHOLD = 512 * 1024  # 512 KB


[docs] class Ionworks: """Client for interacting with the Ionworks API. Handles authentication, request/response processing, and provides access to all API resources through sub-clients. """
[docs] def __init__( self, api_key: str | None = None, api_url: str | None = None, dataframe_backend: str | None = None, timeout: int | None = None, max_retries: int | None = None, ) -> None: """Initialize Ionworks client. Parameters ---------- api_key : str | None API key. If not provided, will look for IONWORKS_API_KEY env var. api_url : str | None API URL. If not provided, will look for IONWORKS_API_URL env var. dataframe_backend : str | None DataFrame backend for returned data: "polars" or "pandas". If not provided, uses IONWORKS_DATAFRAME_BACKEND env var (defaults to "polars"). timeout : int | None Request timeout in seconds. Defaults to 10 seconds if not provided. max_retries : int | None Maximum number of retries for failed requests. Defaults to 5 if not provided. Retries occur on connection errors, timeouts, and 5xx server errors. """ load_dotenv() # Set DataFrame backend (explicit param > env var > default "polars") if dataframe_backend is not None: set_dataframe_backend(dataframe_backend) self.api_key = api_key or os.getenv("IONWORKS_API_KEY") if not self.api_key: raise ValueError( "API key must be provided either as argument or in IONWORKS_API_KEY " "environment variable" ) self.api_url = ( api_url or os.getenv("IONWORKS_API_URL") or "https://api.ionworks.com" ) # Configure timeout (default: 10 seconds) self.request_timeout = timeout if timeout is not None else 10 # Configure retry strategy (default: maximum 5 retries) # Retry on connection errors, timeouts, and 5xx server errors max_retries_value = max_retries if max_retries is not None else 5 retry_strategy = Retry( total=max_retries_value, # Maximum number of retries backoff_factor=0.3, # Wait 0.3, 0.6, 1.2, 2.4, 4.8 seconds between retries status_forcelist=[500, 502, 503, 504], # Retry on these HTTP status codes allowed_methods=[ "GET", "DELETE", ], # Only retry idempotent methods; don't retry PUT because it can cause side effects raise_on_status=False, # Don't raise exception, let it be handled below ) # Create HTTP adapter with retry strategy adapter = HTTPAdapter(max_retries=retry_strategy) self.session = requests.Session() self.session.headers.update( { "X-API-Key": self.api_key, "Content-Type": "application/json", "Accept": "application/json", } ) # Mount adapter for both HTTP and HTTPS self.session.mount("http://", adapter) self.session.mount("https://", adapter) # Initialize components self.job = JobClient(self) self.pipeline = PipelineClient(self) self.cell_spec = CellSpecificationClient(self) self.cell_instance = CellInstanceClient(self) self.cell_measurement = CellMeasurementClient(self) self.simulation = SimulationClient(self) # Backwards-compatible alias self.measurement = self.cell_measurement
[docs] def request( self, method: str, endpoint: str, json_payload: dict[str, Any] | None = None ) -> Any: """Make a request to the Ionworks API with standardized error handling. Requests use the configured timeout and will retry up to the configured maximum number of times on connection errors, timeouts, and 5xx server errors. """ url = f"{self.api_url}{endpoint}" try: # Gzip-compress large payloads to reduce upload time. extra_kwargs: dict[str, Any] = {} if json_payload is not None: raw = json_mod.dumps(json_payload).encode() if len(raw) >= _GZIP_THRESHOLD: extra_kwargs["data"] = gzip.compress(raw, compresslevel=1) del raw extra_kwargs["headers"] = { "Content-Encoding": "gzip", "Content-Type": "application/json", } else: extra_kwargs["data"] = raw response = self.session.request( method, url, timeout=self.request_timeout, **extra_kwargs ) response.raise_for_status() # For DELETE operations, don't try to parse JSON if response is empty if method.upper() == "DELETE": return None # Return JSON response if content type is JSON and response has content if ( response.headers.get("Content-Type") == "application/json" and response.text ): return response.json() return response except requests.exceptions.HTTPError as e: # Print x-correlation-id from response header if available # requests headers are case-insensitive, so this works regardless of case correlation_id = e.response.headers.get("x-correlation-id") if correlation_id: print(f"x-correlation-id: {correlation_id}") try: error_body = e.response.json() # New format: {"error_code": ..., "message": ..., "detail": ...} if "error_code" in error_body: error_detail = error_body # Old format: {"detail": ...} else: error_detail = error_body.get("detail", str(e)) except requests.exceptions.JSONDecodeError: error_detail = str(e) raise IonworksError( error_detail, status_code=e.response.status_code ) from None except requests.exceptions.RequestException as e: # Extract more detailed error information error_msg = f"Error during request to {url}" if hasattr(e, "response") and e.response is not None: error_msg += f" (status code: {e.response.status_code})" try: error_detail = e.response.json().get("detail", e.response.text) error_msg += f": {error_detail}" except requests.exceptions.JSONDecodeError: error_msg += f": {e.response.text}" else: error_msg += f": {e!s}" # Raise the custom error for general request issues raise IonworksError(error_msg) from None
[docs] def get(self, endpoint: str) -> Any: """Make a GET request using the request helper.""" return self.request("GET", endpoint)
[docs] def post(self, endpoint: str, json_payload: dict[str, Any]) -> Any: """Make a POST request using the request helper.""" return self.request("POST", endpoint, json_payload=json_payload)
[docs] def put(self, endpoint: str, json_payload: dict[str, Any]) -> Any: """Make a PUT request using the request helper.""" return self.request("PUT", endpoint, json_payload=json_payload)
[docs] def delete(self, endpoint: str) -> None: """Make a DELETE request using the request helper.""" self.request("DELETE", endpoint)
[docs] def health_check(self) -> dict[str, Any]: """Check the health of the Ionworks API. Returns ------- dict[str, Any] Health check response. """ response_data = self.get("/healthz") return cast(dict[str, Any], response_data)
[docs] def capabilities(self) -> dict[str, Any]: """Fetch platform capabilities and domain context. Returns domain knowledge (battery data hierarchy, key concepts), authentication info, and pointers to JSON Schema endpoints. Returns ------- dict[str, Any] Capabilities including ``domain_context``, ``schemas``, ``openapi_spec``, and ``authentication``. """ return cast(dict[str, Any], self.get("/discovery/capabilities"))
[docs] def schema(self, name: str) -> dict[str, Any]: """Fetch a discovery schema by name. Parameters ---------- name : str Schema to fetch. Supported values: - ``"data"`` — cell data hierarchy (specifications, instances, measurements, steps, time_series). - ``"protocol"`` — Universal Cycler Protocol (UCP) JSON Schema. Returns ------- dict[str, Any] The requested schema. Raises ------ ValueError If *name* is not a recognised schema. """ allowed = {"data", "protocol"} if name not in allowed: raise ValueError( f"Unknown schema {name!r}. Choose from: {', '.join(sorted(allowed))}" ) return cast(dict[str, Any], self.get(f"/discovery/schemas/{name}"))