from __future__ import absolute_import
from builtins import super
import os
from posixpath import join
import threading
from concurrent import futures
import six
import warnings
from requests.packages.urllib3.util import Retry
from civis.response import PaginatedResponse, convert_response_data_type
FINISHED = ['success', 'succeeded']
FAILED = ['failed']
NOT_FINISHED = ['queued', 'running']
CANCELLED = ['cancelled']
DONE = FINISHED + FAILED + CANCELLED
# Translate Civis state strings into `future` state strings
STATE_TRANS = {}
for name in FINISHED + FAILED:
STATE_TRANS[name] = futures._base.FINISHED
for name in NOT_FINISHED:
STATE_TRANS[name] = futures._base.RUNNING
for name in CANCELLED:
STATE_TRANS[name] = futures._base.CANCELLED_AND_NOTIFIED
DEFAULT_API_ENDPOINT = 'https://api.civisanalytics.com/'
def tostr_urljoin(*x):
return join(*map(str, x))
class CivisJobFailure(Exception):
def __init__(self, err_msg, response=None):
self.error_message = err_msg
self.response = response
def __str__(self):
return self.error_message
class CivisAPIError(Exception):
def __init__(self, response):
if response.content: # the API itself gave an error response
json = response.json()
self.error_message = json["errorDescription"]
else: # this was something like a 502
self.error_message = response.reason
self.status_code = response.status_code
self._response = response
def __str__(self):
if self.status_code:
return "({}) {}".format(self.status_code, self.error_message)
else:
return self.error_message
class EmptyResultError(Exception):
pass
class CivisAPIKeyError(Exception):
pass
def get_base_url():
base_url = os.environ.get('CIVIS_API_ENDPOINT', DEFAULT_API_ENDPOINT)
if not base_url.endswith('/'):
base_url += '/'
return base_url
class AggressiveRetry(Retry):
# Subclass Retry so that it retries more things. In particular,
# always retry API requests with a Retry-After header, regardless
# of the verb.
def is_retry(self, method, status_code, has_retry_after=False):
""" Is this method/status code retryable? (Based on whitelists and control
variables such as the number of total retries to allow, whether to
respect the Retry-After header, whether this header is present, and
whether the returned status code is on the list of status codes to
be retried upon on the presence of the aforementioned header)
"""
if (self.total and
self.respect_retry_after_header and
has_retry_after and
(status_code in self.RETRY_AFTER_STATUS_CODES)):
return True
else:
return super().is_retry(method=method, status_code=status_code,
has_retry_after=has_retry_after)
class Endpoint(object):
_lock = threading.Lock()
def __init__(self, session, return_type='civis'):
self._session = session
self._return_type = return_type
self._base_url = get_base_url()
def _build_path(self, path):
if not path:
return self._base_url
return tostr_urljoin(self._base_url, path.strip("/"))
def _make_request(self, method, path=None, params=None, data=None,
**kwargs):
url = self._build_path(path)
with self._lock:
response = self._session.request(method, url, json=data,
params=params, **kwargs)
if response.status_code == 401:
auth_error = response.headers["www-authenticate"]
six.raise_from(CivisAPIKeyError(auth_error),
CivisAPIError(response))
if not response.ok:
raise CivisAPIError(response)
return response
def _call_api(self, method, path=None, params=None, data=None, **kwargs):
iterator = kwargs.pop('iterator', False)
if iterator:
return PaginatedResponse(path, params, self)
else:
resp = self._make_request(method, path, params, data, **kwargs)
resp = convert_response_data_type(resp,
return_type=self._return_type)
return resp
class CivisAsyncResultBase(futures.Future):
"""A base class for tracking asynchronous results.
Sub-classes needs to call either the `set_result` method to set a result
when it finishes or call `set_exception` if there is an error.
The `_result` attribute can also be set to change the current state of
the result. The `_result_` object needs to be set to an object with a
``state`` attribute. Alternatively the `_check_result` method can be
overwritten to change how the state of the object is returned.
Parameters
----------
poller : func
A function which returns an object that has a ``state`` attribute.
poller_args : tuple
The arguments with which to call the poller function.
polling_interval : int or float
The number of seconds between API requests to check whether a result
is ready.
api_key : DEPRECATED str, optional
Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY`
environment variable will be used.
client : :class:`civis.APIClient`, optional
If not provided, an :class:`civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
"""
def __init__(self, poller, poller_args,
polling_interval=None, api_key=None, client=None,
poll_on_creation=True):
super().__init__()
self.poller = poller
self.poller_args = poller_args
self.polling_interval = polling_interval
if api_key is not None:
warnings.warn('The "api_key" parameter is deprecated and will be '
'removed in v2. Please use the `client` parameter '
'instead.', FutureWarning)
self.client = client
self.poll_on_creation = poll_on_creation
def __repr__(self):
# Almost the same as the superclass's __repr__, except we use
# the `_civis_state` rather than the `_state`.
with self._condition:
if self._civis_state in FINISHED + FAILED:
if self.exception():
return '<%s at %#x state=%s raised %s>' % (
self.__class__.__name__,
id(self),
self._civis_state,
self._exception.__class__.__name__)
else:
return '<%s at %#x state=%s returned %s>' % (
self.__class__.__name__,
id(self),
self._civis_state,
self.result().__class__.__name__)
out = '<%s at %#x state=%s>' % (self.__class__.__name__,
id(self),
self._civis_state)
return out
def cancel(self):
"""Not currently implemented."""
raise NotImplementedError("Running jobs cannot currently be cancelled")
def succeeded(self):
"""Return ``True`` if the job completed in Civis with no error."""
with self._condition:
return self._civis_state in FINISHED
def failed(self):
"""Return ``True`` if the Civis job failed."""
with self._condition:
return self._civis_state in FAILED
def _check_result(self):
with self._condition:
if self._result is not None:
return self._result
@property
def _civis_state(self):
"""State as returned from Civis."""
with self._condition:
if self._check_result():
return self._check_result().state
return 'running'
@property
def _state(self):
"""State of the CivisAsyncResultBase in `future` language."""
with self._condition:
return STATE_TRANS[self._civis_state]
@_state.setter
def _state(self, value):
# Ignore attempts to set the _state from the `Future` superclass
pass