Source code for civis.civis

from __future__ import absolute_import
import logging
import warnings

import civis
from civis.compat import lru_cache
from civis.resources import generate_classes_maybe_cached
from civis._utils import get_api_key
from civis._deprecation import deprecate_param


log = logging.getLogger(__name__)

RETRY_CODES = [429, 502, 503, 504]


[docs]def find(object_list, filter_func=None, **kwargs): """Filter :class:`civis.response.Response` objects. Parameters ---------- object_list : iterable An iterable of arbitrary objects, particularly those with attributes that can be targeted by the filters in `kwargs`. A major use case is an iterable of :class:`civis.response.Response` objects. filter_func : callable, optional A one-argument function. If specified, `kwargs` are ignored. An `object` from the input iterable is kept in the returned list if and only if ``bool(filter_func(object))`` is ``True``. **kwargs Key-value pairs for more fine-grained filtering; they cannot be used in conjunction with `filter_func`. All keys must be strings. For an `object` from the input iterable to be included in the returned list, all the `key`s must be attributes of `object`, plus any one of the following conditions for a given `key`: - `value` is a one-argument function and ``bool(value(getattr(object, key)))`` is ``True`` - `value` is ``True`` - ``getattr(object, key)`` is equal to ``value`` Returns ------- list Examples -------- >>> import civis >>> client = civis.APIClient() >>> # creds is a list of civis.response.Response objects >>> creds = client.credentials.list() >>> # target_creds contains civis.response.Response objects >>> # with the attribute 'name' == 'username' >>> target_creds = find(creds, name='username') See Also -------- civis.find_one """ _func = filter_func if not filter_func: def default_filter(o): for k, v in kwargs.items(): if not hasattr(o, k): return False elif callable(v): if not v(getattr(o, k, None)): return False elif isinstance(v, bool): if hasattr(o, k) != v: return False elif v != getattr(o, k, None): return False return True _func = default_filter return [o for o in object_list if _func(o)]
[docs]def find_one(object_list, filter_func=None, **kwargs): """Return one satisfying :class:`civis.response.Response` object. The arguments are the same as those for :func:`civis.find`. If more than one object satisfies the filtering criteria, the first one is returned. If no satisfying objects are found, ``None`` is returned. Returns ------- object or None See Also -------- civis.find """ results = find(object_list, filter_func, **kwargs) return results[0] if results else None
class MetaMixin(): @lru_cache(maxsize=128) def get_database_id(self, database): """Return the database ID for a given database name. Parameters ---------- database : str or int If an integer ID is given, passes through. If a str is given the database ID corresponding to that database name is returned. Returns ------- database_id : int The ID of the database. Raises ------ ValueError If the database can't be found. """ if isinstance(database, int): return database db = find_one(self.databases.list(), name=database) if not db: raise ValueError("Database {} not found.".format(database)) return db["id"] @lru_cache(maxsize=128) def get_database_credential_id(self, username, database_name): """Return the credential ID for a given username in a given database. Parameters ---------- username : str or int If an integer ID is given, this passes through directly. If a str is given, return the ID corresponding to the database credential with that username. database_name : str or int Return the ID of the database credential with username `username` for this database name or ID. Returns ------- database_credential_id : int The ID of the database credentials. Raises ------ ValueError If the credential can't be found. Examples -------- >>> import civis >>> client = civis.APIClient() >>> client.get_database_credential_id('jsmith', 'redshift-general') 1234 >>> client.get_database_credential_id(1111, 'redshift-general') 1111 """ if isinstance(username, int): return username else: creds = self.credentials.list(type="Database") filter_kwargs = {'username': username} if isinstance(database_name, int): filter_kwargs['remote_host_id'] = database_name else: filter_kwargs['remote_host_name'] = database_name my_creds = find_one(creds, **filter_kwargs) if my_creds is None: raise ValueError("Credential ID for {} on {} not " "found.".format(username, database_name)) return my_creds["id"] @lru_cache(maxsize=128) def get_aws_credential_id(self, cred_name, owner=None): """Find an AWS credential ID. Parameters ---------- cred_name : str or int If an integer ID is given, this passes through directly. If a str is given, return the ID corresponding to the AWS credential with that name. owner : str, optional Return the credential with this owner. If not provided, search for credentials under your username to disambiguate multiple credentials with the same name. Note that this function cannot return credentials which are not associated with an owner. Returns ------- aws_credential_id : int The ID number of the AWS credentials. Raises ------ ValueError If the AWS credential can't be found. Examples -------- >>> import civis >>> client = civis.APIClient() >>> client.get_aws_credential_id('jsmith') 1234 >>> client.get_aws_credential_id(1111) 1111 >>> client.get_aws_credential_id('shared-cred', ... owner='research-group') 99 """ if isinstance(cred_name, int): return cred_name else: creds = self.credentials.list(type="Amazon Web Services S3") my_creds = find(creds, name=cred_name) if owner is not None: my_creds = find(my_creds, owner=owner) if not my_creds: own_str = "" if owner is None else " owned by {}".format(owner) msg = "AWS credential ID for {}{} cannot be found" raise ValueError(msg.format(cred_name, own_str)) elif len(my_creds) > 1: if owner is None: # If the user didn't specify an owner, see if we can # narrow down to just credentials owned by this user. owner = self.username my_creds = find(my_creds, owner=owner) if len(my_creds) > 1: log.warning("Found %d AWS credentials with name %s and " "owner %s. Returning the first.", len(my_creds), cred_name, owner) my_creds = my_creds[0] return my_creds["id"] @lru_cache(maxsize=128) def get_table_id(self, table, database): """Return the table ID for a given database and table name. Parameters ---------- table : str The name of the table in format schema.table. database : str or int The name or ID of the database. Returns ------- table_id : int The ID of the table. Only returns exact match to specified table. Raises ------ ValueError If an exact table match can't be found. """ database_id = self.get_database_id(database) schema, name = civis.io.split_schema_tablename(table) tables = self.tables.list(database_id=database_id, schema=schema, name=name) if not tables: msg = "No tables found for {} in database {}" raise ValueError(msg.format(table, database)) found_table = ".".join((tables[0].schema, tables[0].name)) if table != found_table: msg = "Given table {} is not an exact match for returned table {}." raise ValueError(msg.format(table, found_table)) return tables[0].id @property @lru_cache(maxsize=128) def default_credential(self): """The current user's default credential.""" # NOTE: this should be optional to endpoints...so this could go away creds = self.credentials.list(default=True) return creds[0]['id'] if len(creds) > 0 else None @property @lru_cache(maxsize=128) def username(self): """The current user's username.""" return self.users.list_me().username
[docs]class APIClient(MetaMixin): """The Civis API client. Parameters ---------- api_key : str, optional Your API key obtained from the Civis Platform. If not given, the client will use the :envvar:`CIVIS_API_KEY` environment variable. return_type : str, optional The following types are implemented: - ``'raw'`` Returns the raw :class:`requests:requests.Response` object. - ``'snake'`` Returns a :class:`civis.response.Response` object for the json-encoded content of a response. This maps the top-level json keys to snake_case. - ``'pandas'`` Returns a :class:`pandas:pandas.DataFrame` for list-like responses and a :class:`pandas:pandas.Series` for single a json response. retry_total : int, optional A number indicating the maximum number of retries for 429, 502, 503, or 504 errors. api_version : string, optional The version of endpoints to call. May instantiate multiple client objects with different versions. Currently only "1.0" is supported. resources : string, optional When set to "base", only the default endpoints will be exposed in the client object. Set to "all" to include all endpoints available for a given user, including those that may be in development and subject to breaking changes at a later date. This will be removed in a future version of the API client. local_api_spec : collections.OrderedDict or string, optional The methods on this class are dynamically built from the Civis API specification, which can be retrieved from the /endpoints endpoint. When local_api_spec is None, the default, this specification is downloaded the first time APIClient is instantiated. Alternatively, a local cache of the specification may be passed as either an OrderedDict or a filename which points to a json file. """ @deprecate_param('v2.0.0', 'resources') def __init__(self, api_key=None, return_type='snake', retry_total=6, api_version="1.0", resources="all", local_api_spec=None): if return_type not in ['snake', 'raw', 'pandas']: raise ValueError("Return type must be one of 'snake', 'raw', " "'pandas'") self._feature_flags = () session_auth_key = get_api_key(api_key) self._session_kwargs = {'api_key': session_auth_key, 'max_retries': retry_total} # Catch deprecation warnings from generate_classes_maybe_cached and # the functions it calls until the `resources` argument is removed. with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=FutureWarning, module='civis') classes = generate_classes_maybe_cached(local_api_spec, session_auth_key, api_version, resources) for class_name, cls in classes.items(): setattr(self, class_name, cls(self._session_kwargs, return_type)) @property def feature_flags(self): if self._feature_flags: return self._feature_flags me = self.users.list_me() self._feature_flags = tuple(flag for flag, value in me['feature_flags'].items() if value) return self._feature_flags def __getstate__(self): raise RuntimeError("The APIClient object can't be pickled.")