from __future__ import absolute_import
from builtins import super
from civis import APIClient
from civis.base import DONE
from civis.polling import PollableResult
try:
from pubnub.pubnub import PubNub
from pubnub.pnconfiguration import PNConfiguration, PNReconnectionPolicy
from pubnub.enums import PNStatusCategory
from pubnub.callbacks import SubscribeCallback
has_pubnub = True
except ImportError:
has_pubnub = False
# Pubnub connections can recover missed messages upon reconnecting for up to 10
# minutes from the disconnect. Polling on a 9.5 minute interval is used as a
# fallback in case the job complete message is missed in an outage.
_LONG_POLLING_INTERVAL = 9.5 * 60
if has_pubnub:
class JobCompleteListener(SubscribeCallback):
_disconnect_categories = [
PNStatusCategory.PNTimeoutCategory,
PNStatusCategory.PNNetworkIssuesCategory,
PNStatusCategory.PNUnexpectedDisconnectCategory,
]
def __init__(self, match_function, callback_function,
disconnect_function=None):
self.match_function = match_function
self.callback_function = callback_function
self.disconnect_function = disconnect_function
def message(self, pubnub, message):
if self.match_function(message.message):
self.callback_function()
def status(self, pubnub, status):
if status.category in self._disconnect_categories:
if self.disconnect_function:
self.disconnect_function()
def presence(self, pubnub, presence):
pass
[docs]class CivisFuture(PollableResult):
"""
A class for tracking future results.
This class will attempt to subscribe to a Pubnub channel to listen for
job completion events. If you don't have access to Pubnub channels, then
it will fallback to polling.
This is a subclass of :class:`python:concurrent.futures.Future` from the
Python standard library. See:
https://docs.python.org/3/library/concurrent.futures.html
Parameters
----------
poller : func
A function which returns an object that has a ``state`` attribute.
poller_args : tuple
The arguments with which to call the poller function.
polling_interval : int or float, optional
The number of seconds between API requests to check whether a result
is ready.
api_key : DEPRECATED str, optional
Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY`
environment variable will be used.
client : :class:`civis.APIClient`, optional
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
Examples
--------
This example is provided as a function at :func:`~civis.io.query_civis`.
>>> client = civis.APIClient()
>>> database_id = client.get_database_id("my_database")
>>> cred_id = client.default_credential
>>> sql = "SELECT 1"
>>> preview_rows = 10
>>> response = client.queries.post(database_id, sql, preview_rows,
>>> credential=cred_id)
>>> job_id = response.id
>>>
>>> poller = client.queries.get
>>> poller_args = (job_id, ) # (job_id, run_id) if poller requires run_id
>>> polling_interval = 10
>>> future = CivisFuture(poller, poller_args, polling_interval)
"""
def __init__(self, poller, poller_args,
polling_interval=None, api_key=None, client=None,
poll_on_creation=True):
if client is None:
client = APIClient(api_key=api_key, resources='all')
if (polling_interval is None and
has_pubnub and
hasattr(client, 'channels')):
polling_interval = _LONG_POLLING_INTERVAL
super().__init__(poller=poller,
poller_args=poller_args,
polling_interval=polling_interval,
api_key=api_key,
client=client,
poll_on_creation=poll_on_creation)
if has_pubnub and hasattr(client, 'channels'):
config, channels = self._pubnub_config()
self._pubnub = self._subscribe(config, channels)
@property
def subscribed(self):
return (hasattr(self, '_pubnub') and
len(self._pubnub.get_subscribed_channels()) > 0)
def cleanup(self):
super().cleanup()
if hasattr(self, '_pubnub'):
self._pubnub.unsubscribe_all()
def _subscribe(self, pnconfig, channels):
listener = JobCompleteListener(self._check_message,
self._poll_and_set_api_result,
self._reset_polling_thread)
pubnub = PubNub(pnconfig)
pubnub.add_listener(listener)
pubnub.subscribe().channels(channels).execute()
return pubnub
def _pubnub_config(self):
channel_config = self.client.channels.list()
channels = [channel['name'] for channel in channel_config['channels']]
pnconfig = PNConfiguration()
pnconfig.subscribe_key = channel_config['subscribe_key']
pnconfig.cipher_key = channel_config['cipher_key']
pnconfig.auth_key = channel_config['auth_key']
pnconfig.ssl = True
pnconfig.reconnect_policy = PNReconnectionPolicy.LINEAR
return pnconfig, channels
def _check_message(self, message):
try:
# poller_args can be (job_id,) or (job_id, run_id)
if len(self.poller_args) == 1:
match = (message['object']['id'] == self.poller_args[0] and
message['run']['state'] in DONE)
else:
match = (message['object']['id'] == self.poller_args[0] and
message['run']['id'] == self.poller_args[1] and
message['run']['state'] in DONE)
except KeyError:
return False
return match
def _poll_and_set_api_result(self):
with self._condition:
try:
result = self.poller(*self.poller_args)
self._set_api_result(result)
except Exception as e:
self._set_api_exception(exc=e)