Source code for civis.utils._jobs

from __future__ import annotations

from collections.abc import Iterator
import logging
import operator
import time
from datetime import datetime
import warnings

from civis import APIClient
from civis.futures import CivisFuture
from civis._deprecation import DeprecatedKwargDefault

log = logging.getLogger(__name__)

_FOLLOW_POLL_INTERVAL_SEC = 5
_LOG_REFETCH_CUTOFF_SECONDS = 300
_LOG_REFETCH_COUNT = 100
_LOGS_PER_QUERY = 250
_RETURN_AS_OPTIONS = frozenset(("files", "JSONValue", "future"))


def _warn_or_raise_for_JSONValue(JSONValue, return_as):
    """Warn about use of deprecated JSONValue parameter in run_template.
    When it's time to remove JSONValue at civis-python v3.0.0,
    remove this helper and all usage of JSONValue.
    """
    if not isinstance(JSONValue, DeprecatedKwargDefault):
        warn_msg = (
            "As of civis-python v2.8.0, civis.utils.run_template can return three "
            "types of values, so 'JSONValue' is deprecated "
            "and will be removed in civis-python v3.0.0 "
            "(no release timeline yet). "
            "While the arg 'JSONValue' still works for now, you're strongly encouraged "
            "to update your code to use the new keyword argument 'return_as' instead "
            "and stop settting 'JSONValue'. "
        )
        if JSONValue:
            warn_msg += (
                "To return the JSONValue output of the custom script run, "
                'set return_as="JSONValue".'
            )
        conflict_msg = (
            "Update your code so that the 'JSONValue' argument is no longer set, "
            "and set 'return_as' to one of {'files', 'JSONValue', 'future'}. "
            "Note that the default return_as value is 'files' as of "
            "civis-python v2.8.0, but will be 'future' in civis-python v3.0.0."
        )
        if (JSONValue and return_as != "JSONValue") or (
            not JSONValue and return_as == "JSONValue"
        ):
            raise ValueError(
                "Conflicting argument values: "
                f"JSONValue={bool(JSONValue)} but return_as={return_as!r}. "
                + conflict_msg
            )
        else:
            warnings.warn(warn_msg.strip(), FutureWarning, stacklevel=3)
    return return_as


[docs] def run_job( job_id: int, client: APIClient | None = None, polling_interval: int | float | None = None, ) -> CivisFuture: """Run a job. Parameters ---------- job_id: int The ID of the job. client: :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. polling_interval : int or float, optional The number of seconds between API requests to check whether a result is ready. Returns ------- results: :class:`~civis.futures.CivisFuture` A `CivisFuture` object. """ if client is None: client = APIClient() run = client.jobs.post_runs(job_id) return CivisFuture( client.jobs.get_runs, (job_id, run["id"]), client=client, polling_interval=polling_interval, poll_on_creation=False, )
[docs] def run_template( id: int, arguments: dict, JSONValue=DeprecatedKwargDefault(), client: APIClient | None = None, return_as: str = "files", **kwargs, ) -> CivisFuture | dict | None: """Run a template and return the results. Parameters ---------- id: int The template id to be run. arguments: dict Dictionary of arguments to be passed to the template. JSONValue: bool, optional If True, will return the JSON output of the template. If False, will return the file ids associated with the output results. .. deprecated:: 2.8.0 ``JSONValue`` will be removed at civis-python v3.0.0. Please use ``return_as`` instead. return_as: str, optional Determines the return type. Options: - "files": returns file ids associated with output results (default for <v3.0.0) - "JSONValue": returns the JSON output of the template - "future": returns the CivisFuture object for the run At civis-python v3.0.0, the default will change to "future". client: :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. **kwargs: dict Additional keyword arguments to be passed to post_custom. Returns ------- output: dict or CivisFuture If return_as = "files", dictionary of file ids with the keys being their output names. If return_as = "JSONValue", JSON dict containing the results of the template run. Expects only a single JSON result. Will return nothing if either there is no JSON result or there is more than 1 JSON result. If return_as = "future", returns the CivisFuture object for the run. Examples -------- >>> # Run template to return file_ids >>> run_template(my_template_id, arguments=my_dict_of_args) {'output': 1234567} >>> # Run template to return JSON output >>> run_template(my_template_id, arguments=my_dict_of_args, return_as="JSONValue") {'result1': 'aaa', 'result2': 123} >>> # Run template to return CivisFuture >>> run_template(my_template_id, arguments=my_dict_of_args, return_as="future") <CivisFuture object> >>> # Run template with kwargs >>> run_template(my_template_id, arguments=my_dict_of_args, remote_host_id=1, credential_id=2) {'output': 1234567} """ if return_as not in _RETURN_AS_OPTIONS: raise ValueError(f"unsupported return_as option: {return_as}") # Check if JSONValue conflicts with return_as, warn or raise accordingly return_as = _warn_or_raise_for_JSONValue(JSONValue, return_as) if client is None: client = APIClient() job = client.scripts.post_custom(id, arguments=arguments, **kwargs) run = client.scripts.post_custom_runs(job.id) fut = CivisFuture(client.scripts.get_custom_runs, (job.id, run.id), client=client) if return_as == "future": return fut fut.result() outputs = client.scripts.list_custom_runs_outputs(job.id, run.id) if return_as == "JSONValue": json_output = [o.value for o in outputs if o.object_type == "JSONValue"] if len(json_output) == 0: log.warning("No JSON output for template {}".format(id)) return None if len(json_output) > 1: log.warning( "More than 1 JSON output for template {}" " -- returning only the first one.".format(id) ) return json_output[0] # type: ignore[return-value] else: # Expecting return_as == "files" file_ids = {o.name: o.object_id for o in outputs} return file_ids
def _timestamp_from_iso_str(s): """Return an integer POSIX timestamp for a given ISO date string. Note: Until Python 3.11, datetime.fromisoformat doesn't work with the format returned by Civis Platform. """ try: return datetime.fromisoformat(s).timestamp() except ValueError: try: # This is the format that Civis Platform returns. return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S.%f%z").timestamp() except ValueError: # Another format, just in case. return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z").timestamp() def _compute_effective_max_log_id(logs): """Find a max log ID use in order to avoid missing late messages. The order of log IDs may not be consistent with "created at" times since log entries are created by Civis Platform as well as the code for the job itself. This function looks through recent logs and finds a maximum ID that is at least as old as a set cutoff period, so that messages with lower IDs that show up a bit late won't be skipped. With this, it is still theoretically possible but extremely unlikely for some late log messages to be skipped in the job_logs function. """ if not logs: return 0 sorted_logs = sorted(logs, key=operator.itemgetter("id")) max_created_at_timestamp = _timestamp_from_iso_str(sorted_logs[-1]["created_at"]) cutoff = time.time() - _LOG_REFETCH_CUTOFF_SECONDS if max_created_at_timestamp < cutoff: return sorted_logs[-1]["id"] elif len(sorted_logs) >= _LOG_REFETCH_COUNT: return sorted_logs[-_LOG_REFETCH_COUNT]["id"] return 0 def _job_finished_past_timeout(job_id, run_id, finished_timeout, client): """Return true if the run finished more than so many seconds ago.""" if finished_timeout is None: return False run = client.jobs.get_runs(job_id, run_id) finished_at = run.json()["finished_at"] if finished_at is None: return False finished_at_ts = _timestamp_from_iso_str(finished_at) result = finished_at_ts < time.time() - finished_timeout return result
[docs] def job_logs( job_id: int, run_id: int | None = None, finished_timeout: int | None = None, client: APIClient | None = None, ) -> Iterator[dict]: """Return a generator of log message dictionaries for a given run. Parameters ---------- job_id : int The ID of the job to retrieve log message for. run_id : int or None The ID of the run to retrieve log messages for. If None, the ID for the most recent run will be used. finished_timeout: int or None If not None, then this function will return once the run has been finished for the specified number of seconds. If None, then this function will wait until the API says there will be no more new log messages, which may take a few minutes. A timeout of 30-60 seconds is usually enough to retrieve all log messages. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. Yields ------ dict A log message dictionary with "message", "created_at" and other attributes provided by the job logs endpoint. Note that this will block execution until the job has stopped and all log messages are retrieved. Examples -------- >>> # Print all log messages from a job's most recent run >>> for log in job_logs(job_id=123456): ... print(f"{log['created_at']}: {log['message']}") ... >>> # Get logs from a specific run with a 30 second timeout >>> for log in job_logs(job_id=123456, run_id=789, finished_timeout=30): ... print(log['message']) """ client = client or APIClient() if run_id is None: run_id = client.jobs.list_runs( job_id, limit=1, order="id", order_dir="desc" ).json()[0]["id"] local_max_log_id = 0 continue_polling = True known_log_ids = set() while continue_polling: # This call gets a limited number of log messages since last_id, # ordered by log ID. response = client.jobs.list_runs_logs( job_id, run_id, last_id=local_max_log_id, limit=_LOGS_PER_QUERY, ) if response.headers is None: raise RuntimeError("No headers in response from job logs endpoint") if "civis-max-id" in response.headers: remote_max_log_id = int(response.headers["civis-max-id"]) else: # Platform hasn't seen any logs at all yet remote_max_log_id = None logs = response.json() if logs: local_max_log_id = max(log["id"] for log in logs) logs.sort(key=operator.itemgetter("created_at", "id")) for log in logs: if log["id"] in known_log_ids: continue known_log_ids.add(log["id"]) yield log log_finished = response.headers["civis-cache-control"] != "no-store" if remote_max_log_id is None: remote_has_more_logs_to_get_now = False elif local_max_log_id == remote_max_log_id: remote_has_more_logs_to_get_now = False local_max_log_id = _compute_effective_max_log_id(logs) if log_finished or _job_finished_past_timeout( job_id, run_id, finished_timeout, client ): continue_polling = False else: remote_has_more_logs_to_get_now = True if continue_polling and not remote_has_more_logs_to_get_now: time.sleep(_FOLLOW_POLL_INTERVAL_SEC)