from abc import ABCMeta, abstractmethod
from concurrent.futures import Executor
from concurrent import futures
import copy
import datetime
import logging
import os
import time
import threading
import warnings
from civis import APIClient
from civis.base import CivisAPIError, CivisJobFailure, DONE, _err_msg_with_job_run_ids
from civis.polling import PollableResult
log = logging.getLogger(__name__)
[docs]
class CivisFuture(PollableResult):
"""A class for tracking future results.
This is a subclass of :class:`python:concurrent.futures.Future` from the
Python standard library. See:
https://docs.python.org/3/library/concurrent.futures.html
Parameters
----------
poller : func
A function which returns an object that has a ``state`` attribute.
poller_args : tuple
The arguments with which to call the poller function.
polling_interval : int or float, optional
The number of seconds between API requests to check whether a result
is ready. If an integer or float is provided, this number will be used
as the polling interval. If ``None`` (the default), the polling interval will
start at 1 second and increase geometrically up to 15 seconds. The ratio of
the increase is 1.2, resulting in polling intervals in seconds of
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
a short-running job and a capped polling interval for longer-running jobs.
client : :class:`civis.APIClient`, optional
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
Examples
--------
This example is provided as a function at :func:`~civis.io.query_civis`.
>>> import civis
>>> client = civis.APIClient()
>>> database_id = client.get_database_id("my_database")
>>> cred_id = client.default_database_credential_id
>>> sql = "SELECT 1"
>>> preview_rows = 10
>>> response = client.queries.post(database_id, sql, preview_rows,
>>> credential=cred_id)
>>>
>>> poller = client.queries.get_runs
>>> poller_args = response.id, response.last_run_id
>>> polling_interval = 10
>>> future = CivisFuture(poller, poller_args, polling_interval)
>>> future.job_id == response.id
True
>>> future.run_id == response.last_run_id
True
"""
def __init__(
self,
poller,
poller_args,
polling_interval=None,
client=None,
poll_on_creation=True,
):
if client is None:
client = APIClient()
super().__init__(
poller=poller,
poller_args=poller_args,
polling_interval=polling_interval,
client=client,
poll_on_creation=poll_on_creation,
)
self._exception_handled = False
self.add_done_callback(self._set_job_exception)
@staticmethod
def _set_job_exception(fut):
"""Callback: On job completion, check the status.
If the job has failed, and has no pre-existing error message,
populate the error message with recent logs.
"""
# Prevent infinite recursion: this function calls `set_exception`,
# which triggers callbacks (i.e. re-calls this function).
if fut._exception_handled:
return
else:
fut._exception_handled = True
if fut.failed():
# Some platform script types do not return the error message,
# so we override with an exception we can pull from the
# logs
if isinstance(
fut._exception, CivisJobFailure
) and fut._exception._original_err_msg in ("None", "", None):
fut._exception.error_message = ""
exc = fut._exception_from_logs(fut._exception)
fut.set_exception(exc)
def _exception_from_logs(self, exc, nlog=15):
"""Create an exception if the log has a recognizable error
Search "error" emits in the last ``n_log`` lines.
This function presently recognizes the following errors:
- MemoryError
"""
# Traceback in platform logs may be delayed for a few seconds.
time.sleep(15)
logs = self.client.jobs.list_runs_logs(self.job_id, self.run_id, limit=nlog)
# Reverse order as logs come back in reverse chronological order.
logs = logs[::-1]
# Check for memory errors
msgs = [x["message"] for x in logs if x["level"] == "error"]
mem_err = [m for m in msgs if m.startswith("Process ran out of its")]
if mem_err:
err_msg = _err_msg_with_job_run_ids(mem_err[0], self.job_id, self.run_id)
exc = MemoryError(err_msg)
else:
# Unknown error; return logs to the user as a sort of traceback
all_logs = "\n".join([x["message"] for x in logs])
if isinstance(exc, CivisJobFailure):
err_msg = _err_msg_with_job_run_ids(
all_logs + "\n" + exc.error_message, self.job_id, self.run_id
)
exc.error_message = err_msg
else:
err_msg = _err_msg_with_job_run_ids(all_logs, self.job_id, self.run_id)
exc = CivisJobFailure("", job_id=self.job_id, run_id=self.run_id)
exc.error_message = err_msg
return exc
@property
def job_id(self):
"""The job ID for the Civis Platform job that this future is tracking.
Returns
-------
int
"""
return self.poller_args[0]
@property
def run_id(self):
"""The run ID for the Civis Platform job that this future is tracking.
Returns
-------
int | None
"""
try:
return self.poller_args[1]
except IndexError:
# when poller function has job_id only but not run_id
return None
@property
def job_url(self):
"""The URL for the Civis Platform job that this future is tracking.
Returns
-------
str
"""
return f"https://platform.civisanalytics.com/spa/#/jobs/{self.job_id}"
def _check_message(self, message):
try:
# poller_args can be (job_id,) or (job_id, run_id)
if len(self.poller_args) == 1:
match = (
message["object"]["id"] == self.poller_args[0]
and message["run"]["state"] in DONE
)
else:
match = (
message["object"]["id"] == self.poller_args[0]
and message["run"]["id"] == self.poller_args[1]
and message["run"]["state"] in DONE
)
except KeyError:
return False
return match
[docs]
def outputs(self):
"""Block on job completion and return a list of run outputs.
The method will only return run outputs for successful jobs.
Failed jobs will raise an exception.
Returns
-------
list[dict]
List of run outputs from a successfully completed job.
Raises
------
civis.base.CivisJobFailure
If the job fails.
"""
self.result()
outputs = self.client.jobs.list_runs_outputs(self.job_id, self.run_id)
return outputs
class ContainerFuture(CivisFuture):
"""Encapsulates asynchronous execution of a Civis Container Script
This object includes the ability to cancel a run in progress,
as well as the option to automatically retry failed runs.
Retries should only be used for idempotent scripts which might fail
because of network or other random failures.
Parameters
----------
job_id: int
The ID for the container/script/job.
run_id : int
The ID for the run to monitor
max_n_retries : int, optional
If the job generates an exception, retry up to this many times
polling_interval : int or float, optional
The number of seconds between API requests to check whether a result
is ready. If an integer or float is provided, this number will be used
as the polling interval. If ``None`` (the default), the polling interval will
start at 1 second and increase geometrically up to 15 seconds. The ratio of
the increase is 1.2, resulting in polling intervals in seconds of
1, 1.2, 1.44, 1.728, etc. This default behavior allows for a faster return for
a short-running job and a capped polling interval for longer-running jobs.
client : :class:`civis.APIClient`, optional
If not provided, an :class:`civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
See Also
--------
civis.futures.CivisFuture
"""
def __init__(
self,
job_id,
run_id,
max_n_retries=0,
polling_interval=None,
client=None,
poll_on_creation=True,
):
if client is None:
client = APIClient()
self._max_n_retries = max_n_retries
super().__init__(
client.scripts.get_containers_runs,
[int(job_id), int(run_id)],
polling_interval=polling_interval,
client=client,
poll_on_creation=poll_on_creation,
)
def _set_api_exception(self, exc, result=None):
# Catch attempts to set an exception. If there's retries
# remaining, retry the run instead of erroring.
with self._condition:
if self._max_n_retries > 0:
# Start a new run of the script and update
# the run ID used by the poller.
self.cleanup()
self._last_result = self.client.jobs.post_runs(self.job_id)
orig_run_id = self.run_id
self.poller_args[1] = run_id = self._last_result.id
self._max_n_retries -= 1
self._last_polled = time.time()
# Threads can only be started once, and the last thread
# stopped in cleanup. Start a new polling thread.
# Note that it's possible to have a race condition if
# you shut down the old thread too soon after starting it.
# In practice this only happens when testing retries
# with extremely short polling intervals.
self._begin_tracking(start_thread=True)
log.debug(
"Job ID %d / Run ID %d failed. Retrying "
"with run %d. %d retries remaining.",
self.job_id,
orig_run_id,
run_id,
self._max_n_retries,
)
else:
super()._set_api_exception(exc=exc, result=result)
def cancel(self):
"""Submit a request to cancel the container/script/run.
Returns
-------
bool
Whether or not the job is in a cancelled state.
"""
with self._condition:
if self.cancelled():
return True
elif not self.done():
# Cancel the job and store the result of the cancellation in
# the "finished result" attribute, `_result`.
try:
self._result = self.client.scripts.post_cancel(self.job_id)
except CivisAPIError as exc:
if exc.status_code == 404:
# The most likely way to get this error
# is for the job to already be completed.
return False
else:
warnings.warn(
"Unexpected error when attempting to "
"cancel job ID %d / run ID %d:\n%s"
% (self.job_id, self.run_id, str(exc))
)
return False
for waiter in self._waiters:
waiter.add_cancelled(self)
self._condition.notify_all()
self.cleanup()
self._invoke_callbacks()
return self.cancelled()
return False
def _create_docker_command(*args, **kwargs):
"""
Returns a string with the ordered arguments args in order,
followed by the keyword arguments kwargs (in sorted order, for
consistency), separated by spaces.
For example,
``_create_docker_command('./myprogram', 5, 6, wibble=7, wobble=8)``
returns ``"./myprogram 5 6 --wibble 7 --wobble 8"``.
"""
return " ".join(
[str(x) for x in args]
+ ["--{} {}".format(k, v) for k, v in sorted(kwargs.items())]
)
class _CivisExecutor(Executor, metaclass=ABCMeta):
def __init__(
self,
name=None,
hidden=True,
max_n_retries=0,
client=None,
polling_interval=None,
inc_script_names=False,
):
self.max_n_retries = max_n_retries
self.hidden = hidden
self.name = name
self.polling_interval = polling_interval
self.inc_script_names = inc_script_names
self._script_name_counter = 0
self._shutdown_lock = threading.Lock()
self._shutdown_thread = False
if client is None:
client = APIClient()
self.client = client
# A list of ContainerFuture objects for submitted jobs.
self._futures = []
def _make_future(self, job_id, run_id):
"""Instantiates a :class:`~civis.futures.ContainerFuture`,
adds it to the internal list of futures, and returns it.
This is a helper method for :func:`submit`.
"""
future = ContainerFuture(
job_id,
run_id,
polling_interval=self.polling_interval,
max_n_retries=self.max_n_retries,
client=self.client,
poll_on_creation=False,
)
self._futures.append(future)
# Return a ContainerFuture object with the job ID.
return future
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
This creates a container script with the command
determined by the arguments.
Parameters
----------
fn: str or callable
If this is a callable, it ``fn(*args, **kwargs)`` should return a
``str`` for the command to run in docker. If ``None``, then
``_create_docker_command`` will be used.
*args: args
Additional arguments passed to ``fn``.
arguments: dict, optional
If provided, the created script will use the
`arguments` dictionary from the class initialization
updated with the dictionary provided to `submit`.
**kwargs: kwargs
Additional keyword arguments passed to ``fn``.
Returns
-------
:class:`~civis.futures.ContainerFuture`
Note that the ``Future`` returned by ``submit`` will
provide the final status of your Container Script as
its ``.result()``. The user is responsible for downloading
outputs produced by the script, if any.
"""
arguments = kwargs.pop("arguments", {})
arguments.update(
{
"CIVIS_PARENT_JOB_ID": os.getenv("CIVIS_JOB_ID"),
"CIVIS_PARENT_RUN_ID": os.getenv("CIVIS_RUN_ID"),
}
)
with self._shutdown_lock:
if self._shutdown_thread:
raise RuntimeError("cannot schedule new " "futures after shutdown")
if isinstance(fn, str):
cmd = fn
else:
if fn is None:
fn = _create_docker_command
cmd = fn(*args, **kwargs)
name = self.name
if self.inc_script_names:
name = "{} {}".format(name, self._script_name_counter)
self._script_name_counter += 1
job = self._create_job(name=name, arguments=arguments, cmd=cmd)
run = self.client.jobs.post_runs(job.id)
log.debug(
'Container "{}" created with script ID {} and '
"run ID {}".format(name, job.id, run.id)
)
return self._make_future(job.id, run.id)
@abstractmethod
def _create_job(self, name, arguments=None, cmd=None):
raise NotImplementedError("Implement in the child class")
def shutdown(self, wait=True):
"""Wait until all Civis jobs started by this are in done states
Parameters
----------
wait: bool
If ``True``, then this will wait until all jobs are in a
done (i.e., finished or cancelled) state.
"""
with self._shutdown_lock:
self._shutdown_thread = True
if wait:
futures.wait(self._futures)
def cancel_all(self):
"""Send cancel requests for all running Civis jobs"""
for f in self._futures:
# The ContainerFuture is smart enough to only cancel the run
# if the run is still in progress.
f.cancel()
class _ContainerShellExecutor(_CivisExecutor):
"""Parallel computation with Container Scripts in the Civis Platform
Create and run new Container Scripts in the Civis Platform.
A Container Script is a command which runs inside a Docker container
on the Civis Platform. Containers launched by the Executor may
have either different shell commands or different arguments.
This class follows the implementations in :ref:`concurrent.futures`,
with necessary changes for parallelizing over different Container
Script inputs rather than over functions.
Jobs created through this executor will have environment variables
``CIVIS_PARENT_JOB_ID`` and ``CIVIS_PARENT_RUN_ID`` with the contents
of the ``CIVIS_JOB_ID`` and ``CIVIS_RUN_ID`` of the environment which
created them. If the code doesn't have ``CIVIS_JOB_ID`` and ``CIVIS_RUN_ID``
environment variables available, the child will not have
``CIVIS_PARENT_JOB_ID`` and ``CIVIS_PARENT_RUN_ID`` environment variables.
.. note:: If you expect to run a large number of jobs, you may
wish to set automatic retries of failed jobs
(via ``max_n_retries``) to protect against network and
infrastructure failures. Be careful with this if your
jobs cause side effects other than returning a result;
retries may cause any operations executed by your jobs
to be run twice.
Parameters
----------
docker_image_name: str, optional
The name of the Docker image to be used by Civis. You may also
wish to specify a ``docker_image_tag`` in the keyword arguments.
name: str, optional
The name for containers in Civis.
Defaults to "ContainerShellExecutorScript" followed by the date.
required_resources: dict, optional
A dictionary specifying what resources the job needs.
See :func:`~APIClient.scripts.post_containers` for details.
Defaults to 1 CPU and 1 GiB of RAM.
hidden: bool, optional
The hidden status of the object. Setting this to ``True`` hides it
from most API endpoints. The object can still be queried
directly by ID. Defaults to ``True``.
max_n_retries: int, optional
Retry failed jobs this many times before giving up.
Retried jobs will be restarted with exactly the same parameters
as they used the first time; only use this if you expect
that your code is functional and errors would come from
e.g. network problems.
client: APIClient, optional
The :class:`~civis.APIClient` object to use for interacting with the
API. If not specified, a new one will be instantiated.
polling_interval: int or float, optional
The number of seconds between API requests to check whether a result
is ready. This will be passed to the
:class:`~ContainerFuture` objects that are created.
inc_script_names: bool, optional
If ``True``, a counter will be added to the ``name`` to create
the script names for each submission.
**kwargs:
Additional keyword arguments will be passed directly to
:func:`civis.APIClient.scripts.post_containers<civis.resources._resources.Scripts.post_containers>`.
""" # noqa: E501
def __init__(
self,
docker_image_name="civisanalytics/datascience-python",
name=None,
required_resources=None,
hidden=True,
max_n_retries=0,
client=None,
polling_interval=None,
inc_script_names=False,
**kwargs,
):
self.docker_image_name = docker_image_name
self.container_kwargs = kwargs
# Add params for parent job info.
# Overwrite them if they already exist to avoid duplicates, which would
# lead to job failure.
params = [
p
for p in self.container_kwargs.get("params", [])
if p["name"].upper() not in ("CIVIS_PARENT_JOB_ID", "CIVIS_PARENT_RUN_ID")
]
params.extend(
[
{
"name": "CIVIS_PARENT_JOB_ID",
"type": "integer",
"value": os.getenv("CIVIS_JOB_ID"),
},
{
"name": "CIVIS_PARENT_RUN_ID",
"type": "integer",
"value": os.getenv("CIVIS_RUN_ID"),
},
]
)
self.container_kwargs["params"] = params
if required_resources is None:
required_resources = {"cpu": 1024, "memory": 1024}
self.required_resources = required_resources
if name is None:
date_str = datetime.datetime.today().strftime("%Y-%m-%d")
name = "ContainerShellExecutorScript {}".format(date_str)
super().__init__(
name=name,
hidden=hidden,
client=client,
max_n_retries=max_n_retries,
polling_interval=polling_interval,
inc_script_names=inc_script_names,
)
def _create_job(self, name, arguments=None, cmd=None):
# Combine instance and input arguments into one dictionary.
# Use `None` instead of an empty dictionary.
kwargs = copy.deepcopy(self.container_kwargs)
kwargs.setdefault("arguments", {}).update(arguments or {})
if not kwargs["arguments"]:
del kwargs["arguments"]
# Submit a request to Civis to make the container script object.
job = self.client.scripts.post_containers(
name=name,
required_resources=self.required_resources,
docker_command=cmd,
docker_image_name=self.docker_image_name,
hidden=self.hidden,
**self.container_kwargs,
)
return job
class CustomScriptExecutor(_CivisExecutor):
"""Manage a pool of Custom Scripts in the Civis Platform
Each Custom Script will be created from the same template, but may
use different arguments. This class follows the implementations in
:ref:`concurrent.futures`.
If your template has settable parameters "CIVIS_PARENT_JOB_ID" and
``CIVIS_PARENT_RUN_ID``, then this executor will fill them with the contents
of the ``CIVIS_JOB_ID`` and ``CIVIS_RUN_ID`` of the environment which
created them. If the code doesn't have ``CIVIS_JOB_ID`` and ``CIVIS_RUN_ID``
environment variables available, the child will not have
``CIVIS_PARENT_JOB_ID`` and ``CIVIS_PARENT_RUN_ID`` environment variables.
.. note:: If you expect to run a large number of jobs, you may
wish to set automatic retries of failed jobs
(via ``max_n_retries``) to protect against network and
infrastructure failures. Be careful with this if your
jobs cause side effects other than returning a result;
retries may cause any operations executed by your jobs
to be run twice.
Parameters
----------
from_template_id: int
Create jobs as Custom Scripts from the given template ID.
name: str, optional
The name for containers in Civis.
hidden: bool, optional
The hidden status of the object. Setting this to ``True`` hides it
from most API endpoints. The object can still be queried
directly by ID. Defaults to ``True``.
arguments: dict, optional
See :func:`~civis.APIClient.scripts.post_containers` for details.
max_n_retries: int, optional
Retry failed jobs this many times before giving up.
Retried jobs will be restarted with exactly the same parameters
as they used the first time; only use this if you expect
that your code is functional and errors would come from
e.g. network problems.
client: APIClient, optional
The :class:`~civis.APIClient` object to use for interacting with the
API. If not specified, a new one will be instantiated.
polling_interval: int or float, optional
The number of seconds between API requests to check whether a result
is ready. This will be passed to the
:class:`~ContainerFuture` objects that are created.
inc_script_names: bool, optional
If ``True``, a counter will be added to the ``name`` to create
the script names for each submission.
**kwargs:
Additional keyword arguments will be passed directly to
:func:`civis.APIClient.scripts.post_custom<civis.resources._resources.Scripts.post_custom>`.
""" # noqa: E501
def __init__(
self,
from_template_id,
name=None,
hidden=True,
arguments=None,
max_n_retries=0,
client=None,
polling_interval=None,
inc_script_names=False,
**kwargs,
):
self.from_template_id = from_template_id
self.arguments = arguments
self.kwargs = kwargs
if name is None:
date_str = datetime.datetime.today().strftime("%Y-%m-%d")
name = "CustomScriptExecutorScript {}".format(date_str)
super().__init__(
name=name,
hidden=hidden,
client=client,
max_n_retries=max_n_retries,
polling_interval=polling_interval,
inc_script_names=inc_script_names,
)
def submit(self, **arguments):
"""Submit a Custom Script with the given arguments
Parameters
----------
arguments: dict, optional
The created script will use the `arguments` dictionary
from the class initialization updated with any additional
keywords provided here.
Returns
-------
:class:`~civis.futures.ContainerFuture`
"""
return super().submit(fn=None, arguments=arguments)
def _create_job(self, name, arguments=None, cmd=None):
# Combine instance and input arguments into one dictionary.
# Use `None` instead of an empty dictionary.
combined_args = (self.arguments or {}).copy()
combined_args.update((arguments or {}))
if not combined_args:
combined_args = None
job = self.client.scripts.post_custom(
self.from_template_id,
name=name,
arguments=combined_args,
hidden=self.hidden,
**self.kwargs,
)
return job