"""Run CivisML jobs and retrieve the results
"""
import builtins
from builtins import super
import collections
from functools import lru_cache
import io
import json
import logging
import os
import re
import shutil
import tempfile
import threading
import warnings
from concurrent import futures
from functools import wraps
import joblib
try:
from sklearn.base import BaseEstimator
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
from civis import APIClient, find, find_one
from civis._utils import camel_to_snake
from civis.base import CivisAPIError, CivisJobFailure
import civis.io as cio
from civis.futures import ContainerFuture
from civis.response import Response
__all__ = ['ModelFuture', 'ModelError', 'ModelPipeline']
log = logging.getLogger(__name__)
# sentinel value for default primary key value
SENTINEL = collections.namedtuple('Sentinel', [])()
class ModelError(RuntimeError):
def __init__(self, msg, estimator=None, metadata=None):
self.metadata = metadata
self.estimator = estimator
super().__init__(msg)
def _check_fit_initiated(method):
"""Makes sure that the ModelPipeline's been trained"""
@wraps(method)
def wrapper(*args, **kwargs):
self = args[0]
if not self.train_result_:
raise ValueError("This model hasn't been trained yet.")
return method(*args, **kwargs)
return wrapper
def _block_and_handle_missing(method):
"""For ModelFuture file-retrieving property methods.
Block until completion and attempt to retrieve result.
Raise exception only if the result isn't found.
"""
@wraps(method)
def wrapper(self):
futures.wait((self,)) # Block until done
try:
return method(self)
except FileNotFoundError:
# We get here if the modeling job failed to produce
# any output and we don't have metadata.
if self.exception():
raise self.exception() from None
else:
raise
return wrapper
def _stash_local_dataframe(df, template_id, client=None):
"""Store data in a temporary Civis File and return the file ID"""
# Standard dataframe indexes do not have a "levels" attribute,
# but multiindexes do. Checking for this attribute means we don't
# need to import pandas to do error handling here.
if getattr(getattr(df, "index", None), "levels", None) is not None:
raise TypeError("CivisML does not support multi-indexed data frames. "
"Try calling `.reset_index` on your data to convert "
"it into a CivisML-friendly format.")
try:
if template_id > 9969:
return _stash_dataframe_as_feather(df, client)
else:
return _stash_dataframe_as_csv(df, client)
except (ImportError, AttributeError) as exc:
if (df.dtypes == 'category').any():
# The original exception should tell users if they need
# to upgrade pandas (an AttributeError)
# # or if they need to install "feather-format" (ImportError).
raise ValueError(
'Categorical columns can only be handled with pandas '
'version >= 0.20 and `feather-format` installed.') from exc
return _stash_dataframe_as_csv(df, client)
def _stash_dataframe_as_feather(df, client):
civis_fname = 'modelpipeline_data.feather'
with tempfile.TemporaryDirectory() as tdir:
path = os.path.join(tdir, civis_fname)
df.to_feather(path)
file_id = cio.file_to_civis(path, name=civis_fname, client=client)
return file_id
def _stash_dataframe_as_csv(df, client):
civis_fname = 'modelpipeline_data.csv'
txt = io.StringIO()
df.to_csv(txt, encoding='utf-8', index=False)
txt.flush()
txt.seek(0)
file_id = cio.file_to_civis(txt, name=civis_fname, client=client)
return file_id
def _stash_local_file(csv_path, client=None):
"""Store data in a temporary Civis File and return the file ID"""
civis_fname = 'modelpipeline_data.csv'
with open(csv_path) as _fin:
file_id = cio.file_to_civis(_fin, name=civis_fname, client=client)
return file_id
def _decode_train_run(train_job_id, train_run_id, client):
"""Determine correct run ID for use for a given training job ID"""
try:
return int(train_run_id)
except ValueError:
container = client.scripts.get_containers(int(train_job_id))
if train_run_id == 'active':
train_run_id = container.arguments.get('ACTIVE_BUILD', find_one(
container.params, name='ACTIVE_BUILD'))['default']
if train_run_id == 'latest':
return container.last_run.id
try:
return int(train_run_id)
except Exception as exc:
msg = ('Please provide valid train_run_id! Needs to be '
'integer corresponding to a training run ID '
'or one of "active" or "latest".')
raise ValueError(msg) from exc
def _retrieve_file(fname, job_id, run_id, local_dir, client=None):
"""Download a Civis file using a reference on a previous run"""
file_id = cio.file_id_from_run_output(fname, job_id, run_id, client=client)
fpath = os.path.join(local_dir, fname)
# fname may contain a path
output_dir = os.path.dirname(fpath)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(fpath, 'wb') as down_file:
cio.civis_to_file(file_id, down_file, client=client)
return fpath
def _load_table_from_outputs(job_id, run_id, filename, client=None,
**table_kwargs):
"""Load a table from a run output directly into a ``DataFrame``"""
client = APIClient() if client is None else client
file_id = cio.file_id_from_run_output(filename, job_id, run_id,
client=client, regex=True)
return cio.file_to_dataframe(file_id, client=client, **table_kwargs)
def _load_estimator(job_id, run_id, filename='estimator.pkl', client=None):
"""Load a joblib-serialized Estimator from run outputs"""
try:
tempdir = tempfile.mkdtemp()
path = _retrieve_file(filename, job_id, run_id, tempdir, client=client)
obj = joblib.load(path)
finally:
shutil.rmtree(tempdir)
return obj
def _exception_from_logs(exc, job_id, run_id, client, nlog=15):
"""Create an exception if the log has a recognizable error
Search "error" emits in the last ``n_log`` lines.
This function presently recognizes the following errors:
- MemoryError
"""
logs = client.scripts.list_containers_runs_logs(job_id, run_id, limit=nlog)
# Check for memory errors
msgs = [x['message'] for x in logs if x['level'] == 'error']
mem_err = [m for m in msgs if m.startswith('Process ran out of its')]
if mem_err:
exc = MemoryError(mem_err[0])
else:
# Unknown error; return logs to the user as a sort of traceback
all_logs = '\n'.join([x['message'] for x in logs])
if isinstance(exc, CivisJobFailure):
exc.error_message = all_logs + '\n' + exc.error_message
else:
exc = CivisJobFailure(all_logs)
return exc
def _parse_warning(warn_str):
"""Reverse-engineer a warning string
Parameters
----------
warn_str : string
Returns
-------
(str, Warning, str, int)
message, category, filename, lineno
"""
tokens = warn_str.rstrip('\n').split(" ")
# The first token is
# "[filename]:[lineno]:"
filename, lineno, _ = tokens[0].split(':')
# The second token is
# "[category name]:"
category = getattr(builtins, tokens[1][:-1], RuntimeWarning)
message = " ".join(tokens[2:])
return message, category, filename, int(lineno)
def _show_civisml_warnings(warn_list):
"""Re-raise warnings recorded during a CivisML run
Parameters
----------
warn_list : list of str
A list of warnings generated during a CivisML run
"""
for warn_str in warn_list:
try:
warnings.warn_explicit(*_parse_warning(warn_str))
except Exception: # NOQA
warn_str = "Remote warning from CivisML:\n" + warn_str
warnings.warn(warn_str, RuntimeWarning)
def _get_job_type_version(alias):
"""Derive the job type and version from the given alias.
Parameters
----------
alias : str
CivisML alias
Returns
-------
str
Job type, one of {training, prediction, registration}.
str
CivisML version, e.g., "v2.2".
"""
# A version-less alias for production, e.g., "civis-civisml-training"
match_production = re.search(r'\Acivis-civisml-(\w+)\Z', alias)
# A versioned alias, e.g., "civis-civisml-training-v2-3"
match_v = re.search(r'\Acivis-civisml-(\w+)-v(\d+)-(\d+)\Z', alias)
# A special-version alias, e.g., "civis-civisml-training-dev"
match_special = re.search(r'\Acivis-civisml-(\w+)-(\S+[^-])\Z', alias)
if match_production:
job_type = match_production.group(1)
version = None
elif match_v:
job_type = match_v.group(1)
version = 'v%s.%s' % match_v.group(2, 3)
elif match_special:
job_type = match_special.group(1)
version = match_special.group(2)
else:
msg = ('Unable to parse the job type and version '
'from the CivisML alias "%r"')
raise ValueError(msg % alias)
return job_type, version
@lru_cache()
def _get_template_ids_all_versions(client):
"""Get templates IDs for all accessible CivisML versions.
Parameters
----------
client : APIClient
Civis API client object
Returns
-------
Dict[str, Dict[str, int]]
Mapping between versions (e.g., "v2.2") and template IDs for the given
version (e.g., {'training': 1, 'prediction': 2, 'registration': 3}).
"""
template_alias_objects = client.aliases.list(
object_type='template_script', iterator=True
)
civisml_template_alias_objects = find(
template_alias_objects,
alias=lambda alias: alias.startswith('civis-civisml-')
)
ids = collections.defaultdict(
lambda: {'training': None, 'prediction': None, 'registration': None}
)
for alias_obj in civisml_template_alias_objects:
try:
job_type, version = _get_job_type_version(alias_obj.alias)
except ValueError:
msg = (
'%r looks like a CivisML alias for the prefix "civis-civisml-"'
', but it is impossible to parse its job type and version'
)
log.debug(msg % alias_obj)
continue
ids[version][job_type] = alias_obj.object_id
if not ids:
r = Response({'status_code': 404,
'reason': 'No CivisML template IDs are accessible.',
'content': None})
raise CivisAPIError(r)
# Disallow a defaultdict in the output, so that a non-existent CivisML
# version as key should trigger a KeyError.
ids = dict(ids)
return ids
def _get_template_ids(civisml_version, client):
"""Get template IDs for the specified CivisML version.
Parameters
----------
civisml_version : str
CivisML version
client : APIClient
Civis API client object
Returns
-------
int
Template ID for training
int
Template ID for prediction
int
Template ID for pre-trained model registration
"""
template_ids_all_versions = _get_template_ids_all_versions(client)
try:
ids = template_ids_all_versions[civisml_version]
except KeyError:
msg = (
'"{civisml_version}" is an invalid CivisML version. '
'Either this version does not exist, or you do not have access '
'to this version. '
'Versions accessible to you are {{{accessible_versions}}}, '
'as well as `None` for the latest production version.'
).format(
civisml_version=civisml_version,
accessible_versions=', '.join(
'"%s"' % v
# Don't include None, or else it would crash sorted()
for v in sorted(
v for v in template_ids_all_versions.keys() if v
)
)
)
raise ValueError(msg)
return ids['training'], ids['prediction'], ids['registration']
[docs]class ModelFuture(ContainerFuture):
"""Encapsulates asynchronous execution of a CivisML job
This object knows where to find modeling outputs
from CivisML jobs. All data attributes are
lazily retrieved and block on job completion.
This object can be pickled, but it does not store the state
of the attached :class:`~civis.APIClient` object. An unpickled
ModelFuture will use the API key from the user's environment.
Parameters
----------
job_id : int
ID of the modeling job
run_id : int
ID of the modeling run
train_job_id : int, optional
If not provided, this object is assumed to encapsulate a training
job, and ``train_job_id`` will equal ``job_id``.
train_run_id : int, optional
If not provided, this object is assumed to encapsulate a training
run, and ``train_run_id`` will equal ``run_id``.
polling_interval : int or float, optional
The number of seconds between API requests to check whether a result
is ready. The default intelligently switches between a short
interval if ``pubnub`` is not available and a long interval
for ``pubnub`` backup if that library is installed.
client : :class:`civis.APIClient`, optional
If not provided, an :class:`civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
Attributes
----------
metadata : dict, blocking
The metadata associated with this modeling job
metrics : dict, blocking
Validation metrics from this job's training
validation_metadata : dict, blocking
Metadata from this modeling job's validation run
train_metadata : dict, blocking
Metadata from this modeling job's training run
(will be identical to `metadata` if this is a training run)
estimator : :class:`sklearn.pipeline.Pipeline`, blocking
The fitted scikit-learn Pipeline resulting from this model run
table : :class:`pandas.DataFrame`, blocking
The table output from this modeling job: out-of-sample
predictions on the training set for a training job, or
a table of predictions for a prediction job.
If the prediction job was split into multiple files
(this happens automatically for large tables),
this attribute will provide only predictions for the first file.
state : str
The current state of the Civis Platform run
job_id : int
run_id : int
train_job_id : int
Container ID for the training job -- identical to ``job_id``
if this is a training job.
train_run_id : int
As ``train_job_id`` but for runs
is_training : bool
True if this ``ModelFuture`` corresponds to a train-validate job.
Methods
-------
cancel()
Cancels the corresponding Platform job before completion
succeeded()
(Non-blocking) Is the job a success?
failed()
(Non-blocking) Did the job fail?
cancelled()
(Non-blocking) Was the job cancelled?
running()
(Non-blocking) Is the job still running?
done()
(Non-blocking) Is the job finished?
result()
(Blocking) Return the final status of the Civis Platform job.
See Also
--------
civis.futures.CivisFuture
civis.futures.ContainerFuture
concurrent.futures.Future
"""
def __init__(self, job_id, run_id, train_job_id=None, train_run_id=None,
polling_interval=None, client=None, poll_on_creation=True):
super().__init__(job_id, run_id,
polling_interval=polling_interval,
client=client,
poll_on_creation=poll_on_creation)
if train_job_id and train_run_id:
self.is_training = False
self.train_job_id = train_job_id
self.train_run_id = train_run_id
else:
self.is_training = True
self.train_job_id = self.job_id
self.train_run_id = self.run_id
self._metadata, self._val_metadata = None, None
self._train_data, self._train_data_fname = None, None
self._train_metadata = None
self._table, self._estimator = None, None
self._exception_handled = False
self.add_done_callback(self._set_model_exception)
@staticmethod
def _set_model_exception(fut):
"""Callback: On job completion, check the metadata.
If it indicates an exception, replace the generic
``CivisJobFailure`` by a more informative ``ModelError``.
"""
# Prevent infinite recursion: this function calls `set_exception`,
# which triggers callbacks (i.e. re-calls this function).
if fut._exception_handled:
return
else:
fut._exception_handled = True
try:
meta = fut.metadata
if fut.is_training and meta['run']['status'] == 'succeeded':
# if training job and job succeeded, check validation job
meta = fut.validation_metadata
if meta is not None and meta['run']['status'] == 'exception':
try:
# This will fail if the user doesn't have joblib installed
est = fut.estimator
except Exception: # NOQA
est = None
fut.set_exception(
ModelError('Model run failed with stack trace:\n'
'{}'.format(meta['run']['stack_trace']),
est, meta))
except (FileNotFoundError, CivisJobFailure) as exc:
# If there's no metadata file
# (we get FileNotFound or CivisJobFailure),
# check the tail of the log for a clearer exception.
exc = _exception_from_logs(exc, fut.job_id, fut.run_id, fut.client)
fut.set_exception(exc)
except futures.CancelledError:
# We don't need to change the exception if the run was cancelled
pass
except KeyError:
# KeyErrors always represent a bug in the modeling code,
# but showing the resulting KeyError can be confusing and
# mask the real error.
warnings.warn("Received malformed metadata from Civis Platform. "
"Something went wrong with job execution.")
def __getstate__(self):
state = self.__dict__.copy()
del state['_polling_thread']
del state['client']
del state['poller']
del state['_condition']
if '_pubnub' in state:
state['_pubnub'] = True # Replace with a boolean flag
state['_done_callbacks'] = []
state['_self_polling_executor'] = None
return state
def __setstate__(self, state):
self.__dict__ = state
self._condition = threading.Condition()
self.client = APIClient()
self.poller = self.client.scripts.get_containers_runs
self._begin_tracking()
self.add_done_callback(self._set_model_exception)
@property
def state(self):
state = self._civis_state
if state == 'succeeded':
state = self.metadata['run']['status']
return state
@property
def table_fname(self):
return 'predictions.csv'
@property
def metadata_fname(self):
return 'model_info.json'
@property
def train_data_fname(self):
if self._train_data_fname is None:
self._train_data_fname = os.path.basename(self.training_metadata
.get('run')
.get('configuration')
.get('data')
.get('location'))
return self._train_data_fname
@property
def train_data(self):
if self._train_data is None:
try:
self._train_data = _load_table_from_outputs(
self.train_job_id,
self.train_run_id,
self.train_data_fname,
client=self.client)
except CivisAPIError as err:
if err.status_code == 404:
msg = 'There is no training data stored for this job!'
raise ValueError(msg) from err
else:
raise
return self._train_data
def _table_primary_key(self):
# metadata path to input parameters is different
# for training and prediction
if self.is_training:
pkey = self.metadata[
'run']['configuration']['data']['primary_key']
else:
pkey = self.metadata[
'jobs'][0]['run']['configuration']['data']['primary_key']
return pkey
@property
def table(self):
self.result() # Block and raise errors if any
if self._table is None:
# An index column will only be present if primary key is
if self._table_primary_key() is None:
index_col = False
else:
index_col = 0
if self.is_training:
try:
# Training jobs only have one output table, the OOS scores
self._table = _load_table_from_outputs(self.job_id,
self.run_id,
self.table_fname,
index_col=index_col,
client=self.client)
except FileNotFoundError:
# Just pass here, because we want the table to stay None
# if it does not exist
pass
else:
# Prediction jobs may have many output tables.
output_ids = self.metadata['output_file_ids']
if len(output_ids) > 1:
print('This job output {} files. Retrieving only the '
'first. Find the full list at `metadata'
'["output_file_ids"]`.'.format(len(output_ids)))
self._table = cio.file_to_dataframe(output_ids[0],
client=self.client,
index_col=index_col)
return self._table
@property
@_block_and_handle_missing
def metadata(self):
if self._metadata is None:
fid = cio.file_id_from_run_output('model_info.json', self.job_id,
self.run_id, client=self.client)
self._metadata = cio.file_to_json(fid, client=self.client)
_show_civisml_warnings(self._metadata.get('warnings', []))
return self._metadata
@property
@_block_and_handle_missing
def estimator(self):
if self._estimator is None:
self._estimator = _load_estimator(self.train_job_id,
self.train_run_id,
client=self.client)
return self._estimator
@property
@_block_and_handle_missing
def validation_metadata(self):
if self._val_metadata is None:
try:
fid = cio.file_id_from_run_output('metrics.json',
self.train_job_id,
self.train_run_id,
client=self.client)
except FileNotFoundError:
# Use an empty dictionary to indicate that
# we've already checked for metadata.
self._val_metadata = {}
else:
self._val_metadata = cio.file_to_json(fid, client=self.client)
if not self._val_metadata:
# Convert an empty dictionary to None
return None
else:
return self._val_metadata
@property
def metrics(self):
if self.validation_metadata:
return self.validation_metadata['metrics']
else:
return None
@property
@_block_and_handle_missing
def training_metadata(self):
if self._train_metadata is None:
fid = cio.file_id_from_run_output('model_info.json',
self.train_job_id,
self.train_run_id,
client=self.client)
self._train_metadata = cio.file_to_json(fid, client=self.client)
return self._train_metadata
[docs]class ModelPipeline:
"""Interface for scikit-learn modeling in the Civis Platform
Each ModelPipeline corresponds to a scikit-learn
:class:`~sklearn.pipeline.Pipeline` which will run in Civis Platform.
Note that this object can be safely pickled and unpickled, but it
does not store the state of any attached :class:`~civis.APIClient` object.
An unpickled ModelPipeline will use the API key from the user's
environment.
Parameters
----------
model : string or Estimator
Either the name of a pre-defined model
(e.g. "sparse_logistic" or "gradient_boosting_classifier")
or else a pre-existing Estimator object.
dependent_variable : string or List[str]
The dependent variable of the training dataset.
For a multi-target problem, this should be a list of
column names of dependent variables. Nulls in a single
dependent variable will automatically be dropped.
primary_key : string, optional
The unique ID (primary key) of the training dataset.
This will be used to index the out-of-sample scores.
parameters : dict, optional
Specify parameters for the final stage estimator in a
predefined model, e.g. ``{'C': 2}`` for a "sparse_logistic"
model.
cross_validation_parameters : dict or string, optional
Options for cross validation. For grid search, supply a
parameter grid as a dictionary, e.g.,
``{{'n_estimators': [100, 200, 500], 'learning_rate': [0.01, 0.1],
'max_depth': [2, 3]}}``. For hyperband, pass the string "hyperband".
model_name : string, optional
The prefix of the Platform modeling jobs. It will have
" Train" or " Predict" added to become the Script title.
calibration : {None, "sigmoid", "isotonic"}
If not None, calibrate output probabilities with the selected method.
Valid only with classification models.
excluded_columns : array, optional
A list of columns which will be considered ineligible to be
independent variables.
client : :class:`~civis.APIClient`, optional
If not provided, an :class:`~civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
cpu_requested : int, optional
Number of CPU shares requested in the Civis Platform for
training jobs. 1024 shares = 1 CPU.
memory_requested : int, optional
Memory requested from Civis Platform for training jobs, in MiB
disk_requested : float, optional
Disk space requested on Civis Platform for training jobs, in GB
notifications : dict
See :func:`~civis.resources._resources.Scripts.post_custom` for
further documentation about email and URL notification.
dependencies : array, optional
List of packages to install from PyPI or git repository (e.g., Github
or Bitbucket). If a private repo is specified, please include a
``git_token_name`` argument as well (see below). Make sure to pin
dependencies to a specific version, since dependencies will be
reinstalled during every training and predict job.
git_token_name : str, optional
Name of remote git API token stored in Civis Platform as the password
field in a custom platform credential. Used only when installing
private git repositories.
verbose : bool, optional
If True, supply debug outputs in Platform logs and make
prediction child jobs visible.
etl : Estimator, optional
Custom ETL estimator which overrides the default ETL, and
is run before training and validation.
civisml_version : str, optional
CivisML version to use for training and prediction.
If not provided, the latest version in production is used.
Methods
-------
train()
Train the model on data in Civis Platform; outputs
:class:`~civis.ml.ModelFuture`
predict()
Make predictions on new data; outputs :class:`~civis.ml.ModelFuture`
from_existing()
Class method; use to create a :class:`~civis.ml.ModelPipeline`
from an existing model training run
Attributes
----------
estimator : :class:`~sklearn.pipeline.Pipeline`
The trained scikit-learn Pipeline
train_result_ : :class:`~civis.ml.ModelFuture`
:class:`~civis.ml.ModelFuture` encapsulating this model's training run
state : str
Status of the training job (non-blocking)
Examples
--------
>>> from civis.ml import ModelPipeline
>>> model = ModelPipeline('gradient_boosting_classifier', 'depvar',
... primary_key='voterbase_id')
>>> train = model.train(table_name='schema.survey_data',
... fit_params={'sample_weight': 'survey_weight'},
... database_name='My Redshift Cluster',
... oos_scores='scratch.survey_depvar_oos_scores')
>>> train
<ModelFuture at 0x11be7ae10 state=queued>
>>> train.running()
True
>>> train.done()
False
>>> df = train.table # Read OOS scores from its Civis File. Blocking.
>>> meta = train.metadata # Metadata from training run
>>> train.metrics['roc_auc']
0.88425
>>> pred = model.predict(table_name='schema.demographics_table ',
... database_name='My Redshift Cluster',
... output_table='schema.predicted_survey_response',
... if_exists='drop')
>>> df_pred = pred.table # Blocks until finished
# Modify the parameters of the base estimator in a default model:
>>> model = ModelPipeline('sparse_logistic', 'depvar',
... primary_key='voterbase_id',
... parameters={'C': 2})
# Grid search over hyperparameters in the base estimator:
>>> model = ModelPipeline('sparse_logistic', 'depvar',
... primary_key='voterbase_id',
... cross_validation_parameters={'C': [0.1, 1, 10]})
See Also
--------
civis.ml.ModelFuture
"""
def __init__(self, model, dependent_variable,
primary_key=None, parameters=None,
cross_validation_parameters=None, model_name=None,
calibration=None, excluded_columns=None, client=None,
cpu_requested=None, memory_requested=None,
disk_requested=None, notifications=None,
dependencies=None, git_token_name=None, verbose=False,
etl=None, civisml_version=None):
self.model = model
self._input_model = model # In case we need to modify the input
if isinstance(dependent_variable, str):
# Standardize the dependent variable as a list.
dependent_variable = [dependent_variable]
self.dependent_variable = dependent_variable
# optional but common parameters
self.primary_key = primary_key
self.parameters = parameters or {}
self.cv_params = cross_validation_parameters or {}
self.model_name = model_name # None lets Platform use template name
self.excluded_columns = excluded_columns
self.calibration = calibration
self.job_resources = {'REQUIRED_CPU': cpu_requested,
'REQUIRED_MEMORY': memory_requested,
'REQUIRED_DISK_SPACE': disk_requested}
self.notifications = notifications or {}
self.dependencies = dependencies
self.git_token_name = git_token_name
self.verbose = verbose
if client is None:
client = APIClient()
self._client = client
self.train_result_ = None
template_ids = _get_template_ids(civisml_version, self._client)
self.train_template_id, self.predict_template_id, _ = template_ids
self.etl = etl
if self.train_template_id < 9968 and self.etl is not None:
# This is a pre-v2.0 CivisML template
raise NotImplementedError("The etl argument is not implemented"
" in this version of CivisML.")
def __getstate__(self):
state = self.__dict__.copy()
del state['_client']
return state
def __setstate__(self, state):
self.__dict__ = state
self._client = APIClient()
[docs] @classmethod
def register_pretrained_model(cls, model, dependent_variable=None,
features=None, primary_key=None,
model_name=None, dependencies=None,
git_token_name=None,
skip_model_check=False, verbose=False,
client=None, civisml_version=None):
"""Use a fitted scikit-learn model with CivisML scoring
Use this function to set up your own fitted scikit-learn-compatible
Estimator object for scoring with CivisML. This function will
upload your model to Civis Platform and store enough metadata
about it that you can subsequently use it with a CivisML scoring job.
The only required input is the model itself, but you are strongly
recommended to also provide a list of feature names. Without a list
of feature names, CivisML will have to assume that your scoring
table contains only the features needed for scoring (perhaps also
with a primary key column), in all in the correct order.
Parameters
----------
model : sklearn.base.BaseEstimator or int
The model object. This must be a fitted scikit-learn compatible
Estimator object, or else the integer Civis File ID of a
pickle or joblib-serialized file which stores such an object.
If an Estimator object is provided, it will be uploaded to the
Civis Files endpoint and set to be available indefinitely.
dependent_variable : string or List[str], optional
The dependent variable of the training dataset.
For a multi-target problem, this should be a list of
column names of dependent variables.
features : string or List[str], optional
A list of column names of features which were used for training.
These will be used to ensure that tables input for prediction
have the correct features in the correct order.
primary_key : string, optional
The unique ID (primary key) of the scoring dataset
model_name : string, optional
The name of the Platform registration job. It will have
" Predict" added to become the Script title for predictions.
dependencies : array, optional
List of packages to install from PyPI or git repository (e.g.,
GitHub or Bitbucket). If a private repo is specified, please
include a ``git_token_name`` argument as well (see below).
Make sure to pin dependencies to a specific version, since
dependencies will be reinstalled during every predict job.
git_token_name : str, optional
Name of remote git API token stored in Civis Platform as
the password field in a custom platform credential.
Used only when installing private git repositories.
skip_model_check : bool, optional
If you're sure that your model will work with CivisML, but it
will fail the comprehensive verification, set this to True.
verbose : bool, optional
If True, supply debug outputs in Platform logs and make
prediction child jobs visible.
client : :class:`~civis.APIClient`, optional
If not provided, an :class:`~civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
civisml_version : str, optional
CivisML version to use.
If not provided, the latest version in production is used.
Returns
-------
:class:`~civis.ml.ModelPipeline`
Examples
--------
This example assumes that you already have training data
``X`` and ``y``, where ``X`` is a :class:`~pandas.DataFrame`.
>>> from civis.ml import ModelPipeline
>>> from sklearn.linear_model import Lasso
>>> est = Lasso().fit(X, y)
>>> model = ModelPipeline.register_pretrained_model(
... est, 'concrete', features=X.columns)
>>> model.predict(table_name='my.table', database_name='my-db')
"""
client = client or APIClient()
if isinstance(dependent_variable, str):
dependent_variable = [dependent_variable]
if isinstance(features, str):
features = [features]
if isinstance(dependencies, str):
dependencies = [dependencies]
if not model_name:
model_name = ("Pretrained {} model for "
"CivisML".format(model.__class__.__name__))
model_name = model_name[:255] # Max size is 255 characters
if isinstance(model, (int, float, str)):
model_file_id = int(model)
else:
try:
tempdir = tempfile.mkdtemp()
fout = os.path.join(tempdir, 'model_for_civisml.pkl')
joblib.dump(model, fout, compress=3)
with open(fout, 'rb') as _fout:
# NB: Using the name "estimator.pkl" means that
# CivisML doesn't need to copy this input to a file
# with a different name.
model_file_id = cio.file_to_civis(
_fout, 'estimator.pkl', expires_at=None, client=client)
finally:
shutil.rmtree(tempdir)
args = {'MODEL_FILE_ID': str(model_file_id),
'SKIP_MODEL_CHECK': skip_model_check,
'DEBUG': verbose}
if dependent_variable is not None:
args['TARGET_COLUMN'] = ' '.join(dependent_variable)
if features is not None:
args['FEATURE_COLUMNS'] = ' '.join(features)
if dependencies is not None:
args['DEPENDENCIES'] = ' '.join(dependencies)
if git_token_name:
creds = find(client.credentials.list(),
name=git_token_name,
type='Custom')
if len(creds) > 1:
raise ValueError("Unique credential with name '{}' for "
"remote git hosting service not found!"
.format(git_token_name))
args['GIT_CRED'] = creds[0].id
_, _, template_id = _get_template_ids(civisml_version, client)
if template_id is None:
msg = (
'No registration template ID is available. '
'Pre-trained model registration is available for CivisML '
'v2.2 (for which `civisml_version` would be "v2.2") or above, '
'but you have specified CivisML version "%r"'
)
raise ValueError(msg % civisml_version)
container = client.scripts.post_custom(
from_template_id=template_id,
name=model_name,
arguments=args)
log.info('Created custom script %s.', container.id)
run = client.scripts.post_custom_runs(container.id)
log.debug('Started job %s, run %s.', container.id, run.id)
fut = ModelFuture(container.id, run.id, client=client,
poll_on_creation=False)
fut.result()
log.info('Model registration complete.')
mp = ModelPipeline.from_existing(fut.job_id, fut.run_id, client)
mp.primary_key = primary_key
return mp
[docs] @classmethod
def from_existing(cls, train_job_id, train_run_id='latest', client=None):
"""Create a :class:`ModelPipeline` object from existing model IDs
Parameters
----------
train_job_id : int
The ID of the CivisML job in the Civis Platform
train_run_id : int or string, optional
Location of the model run, either
* an explicit run ID,
* "latest" : The most recent run
* "active" : The run designated by the training job's
"active build" parameter
client : :class:`~civis.APIClient`, optional
If not provided, an :class:`~civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
Returns
-------
:class:`~civis.ml.ModelPipeline`
A :class:`~civis.ml.ModelPipeline` which refers to
a previously-trained model
Examples
--------
>>> from civis.ml import ModelPipeline
>>> model = ModelPipeline.from_existing(job_id)
>>> model.train_result_.metrics['roc_auc']
0.843
"""
train_job_id = int(train_job_id) # Convert np.int to int
if client is None:
client = APIClient()
train_run_id = _decode_train_run(train_job_id, train_run_id, client)
try:
fut = ModelFuture(train_job_id, train_run_id, client=client)
container = client.scripts.get_containers(train_job_id)
except CivisAPIError as api_err:
if api_err.status_code == 404:
msg = ('There is no Civis Platform job with '
'script ID {} and run ID {}!'.format(train_job_id,
train_run_id))
raise ValueError(msg) from api_err
raise
args = container.arguments
# Older templates used "WORKFLOW" instead of "MODEL"
model = args.get('MODEL', args.get('WORKFLOW'))
dependent_variable = args['TARGET_COLUMN'].split()
primary_key = args.get('PRIMARY_KEY')
parameters = json.loads(args.get('PARAMS', "{}"))
cross_validation_parameters = json.loads(args.get('CVPARAMS', "{}"))
calibration = args.get('CALIBRATION')
excluded_columns = args.get('EXCLUDE_COLS', None)
if excluded_columns:
excluded_columns = excluded_columns.split()
cpu_requested = args.get('REQUIRED_CPU')
memory_requested = args.get('REQUIRED_MEMORY')
disk_requested = args.get('REQUIRED_DISK_SPACE')
name = container.name
if name.endswith(' Train'):
# Strip object-applied suffix
name = name[:-len(' Train')]
notifications = {camel_to_snake(key): val for key, val
in container.notifications.items()}
dependencies = args.get('DEPENDENCIES', None)
if dependencies:
dependencies = dependencies.split()
git_token_name = args.get('GIT_CRED', None)
if git_token_name:
git_token_name = client.credentials.get(git_token_name).name
klass = cls(model=model,
dependent_variable=dependent_variable,
primary_key=primary_key,
model_name=name,
parameters=parameters,
cross_validation_parameters=cross_validation_parameters,
calibration=calibration,
excluded_columns=excluded_columns,
client=client,
cpu_requested=cpu_requested,
disk_requested=disk_requested,
memory_requested=memory_requested,
notifications=notifications,
dependencies=dependencies,
git_token_name=git_token_name,
verbose=args.get('DEBUG', False))
klass.train_result_ = fut
# Set prediction template corresponding to training
# or registration template
template_id = int(container['from_template_id'])
ids = find_one(
_get_template_ids_all_versions(client).values(),
lambda ids: ids['training'] == template_id or ids['registration'] == template_id # noqa
)
p_id = ids['prediction']
klass.predict_template_id = p_id
return klass
[docs] def train(self, df=None, csv_path=None, table_name=None,
database_name=None, file_id=None,
sql_where=None, sql_limit=None, oos_scores=None,
oos_scores_db=None, if_exists='fail', fit_params=None,
polling_interval=None, validation_data='train', n_jobs=None):
"""Start a Civis Platform job to train your model
Provide input through one of
a :class:`~pandas.DataFrame` (``df``),
a local CSV (``csv_path``),
a Civis Table (``table_name`` and ``database_name``), or
a Civis File containing a CSV (``file_id``).
Model outputs will always contain out-of-sample scores
(accessible through :attr:`ModelFuture.table` on this function's
output), and you may chose to store these out-of-sample scores
in a Civis Table with the ``oos_scores``, ``oos_scores_db``,
and ``if_exists`` parameters.
Parameters
----------
df : pd.DataFrame, optional
A :class:`~pandas.DataFrame` of training data.
The :class:`~pandas.DataFrame` will be uploaded to a Civis file so
that CivisML can access it.
Note that the index of the :class:`~pandas.DataFrame` will be
ignored -- use ``df.reset_index()`` if you want your
index column to be included with the data passed to CivisML.
NB: You must install ``feather-format`` if your
:class:`~pandas.DataFrame` contains :class:`~pandas.Categorical`
columns, to ensure that CivisML preserves data types.
csv_path : str, optional
The location of a CSV of data on the local disk.
It will be uploaded to a Civis file.
table_name : str, optional
The qualified name of the table containing the training set from
which to build the model.
database_name : str, optional
Name of the database holding the training set table used to
build the model. E.g., 'My Cluster Name'.
file_id : int, optional
If the training data are stored in a Civis file,
provide the integer file ID.
sql_where : str, optional
A SQL WHERE clause used to scope the rows of the training set
(used for table input only)
sql_limit : int, optional
SQL LIMIT clause for querying the training set
(used for table input only)
oos_scores : str, optional
If provided, store out-of-sample predictions on
training set data to this Redshift "schema.tablename".
oos_scores_db : str, optional
If not provided, store OOS predictions in the same database
which holds the training data.
if_exists : {'fail', 'append', 'drop', 'truncate'}
Action to take if the out-of-sample prediction table
already exists.
fit_params: Dict[str, str]
Mapping from parameter names in the model's ``fit`` method
to the column names which hold the data, e.g.
``{'sample_weight': 'survey_weight_column'}``.
polling_interval : float, optional
Check for job completion every this number of seconds.
Do not set if using the notifications endpoint.
validation_data : str, optional
Source for validation data. There are currently two options:
`'train'` (the default), which cross-validates over training data
for validation; and `'skip'`, which skips the validation step.
n_jobs : int, optional
Number of jobs to use for training and validation. Defaults to
`None`, which allows CivisML to dynamically calculate an
appropriate number of workers to use (in general, as many as
possible without using all resources in the cluster).
Increase n_jobs to parallelize over many hyperparameter
combinations in grid search/hyperband, or decrease to use fewer
computational resources at once.
Returns
-------
:class:`~civis.ml.ModelFuture`
"""
if ((table_name is None or database_name is None) and
file_id is None and df is None and csv_path is None):
raise ValueError('Provide a source of data.')
if sum((bool(table_name and database_name),
bool(file_id), df is not None, csv_path is not None)) > 1:
raise ValueError('Provide a single source of data.')
if df is not None:
file_id = _stash_local_dataframe(df, self.train_template_id,
client=self._client)
elif csv_path:
file_id = _stash_local_file(csv_path, client=self._client)
train_args = {'TARGET_COLUMN': ' '.join(self.dependent_variable),
'PRIMARY_KEY': self.primary_key,
'PARAMS': json.dumps(self.parameters),
'CVPARAMS': json.dumps(self.cv_params),
'CALIBRATION': self.calibration,
'IF_EXISTS': if_exists}
if oos_scores:
train_args['OOSTABLE'] = oos_scores
if oos_scores_db:
oos_db_id = self._client.get_database_id(oos_scores_db)
train_args['OOSDB'] = {'database': oos_db_id}
if sql_where:
train_args['WHERESQL'] = sql_where
if sql_limit:
train_args['LIMITSQL'] = sql_limit
if self.excluded_columns:
train_args['EXCLUDE_COLS'] = ' '.join(self.excluded_columns)
if fit_params:
train_args['FIT_PARAMS'] = json.dumps(fit_params)
if self.dependencies:
train_args['DEPENDENCIES'] = ' '.join(self.dependencies)
if self.train_template_id >= 9968:
if validation_data:
train_args['VALIDATION_DATA'] = validation_data
if n_jobs:
train_args['N_JOBS'] = n_jobs
if HAS_SKLEARN and isinstance(self.model, BaseEstimator):
try:
tempdir = tempfile.mkdtemp()
fout = os.path.join(tempdir, 'estimator.pkl')
joblib.dump(self.model, fout, compress=3)
with open(fout, 'rb') as _fout:
n = self.model_name if self.model_name else "CivisML"
estimator_file_id = cio.file_to_civis(
_fout, 'Estimator for ' + n, client=self._client)
self._input_model = self.model # Keep the estimator
self.model = str(estimator_file_id)
finally:
shutil.rmtree(tempdir)
train_args['MODEL'] = self.model
if HAS_SKLEARN and self.train_template_id >= 9968:
if isinstance(self.etl, BaseEstimator):
try:
tempdir = tempfile.mkdtemp()
fout = os.path.join(tempdir, 'ETL.pkl')
joblib.dump(self.etl, fout, compress=3)
with open(fout, 'rb') as _fout:
etl_file_id = cio.file_to_civis(
_fout, 'ETL Estimator', client=self._client)
train_args['ETL'] = str(etl_file_id)
finally:
shutil.rmtree(tempdir)
name = self.model_name + ' Train' if self.model_name else None
# Clear the existing training result so we can make a new one.
self.train_result_ = None
result, container, run = self._create_custom_run(
self.train_template_id,
job_name=name,
table_name=table_name,
database_name=database_name,
file_id=file_id,
args=train_args,
resources=self.job_resources,
polling_interval=polling_interval)
self.train_result_ = result
return result
def _create_custom_run(self, template_id, job_name=None, table_name=None,
database_name=None, file_id=None, args=None,
resources=None, polling_interval=None):
# Handle int-like but non-Python-integer types such as np.int64
file_id = int(file_id) if file_id is not None else file_id
script_arguments = {'TABLE_NAME': table_name,
'CIVIS_FILE_ID': file_id,
'DEBUG': self.verbose}
if database_name:
if template_id < 8000:
# v0 jobs used a different database parameter
script_arguments['DB_NAME'] = database_name
else:
db_id = self._client.get_database_id(database_name)
script_arguments['DB'] = {'database': db_id}
resources = resources or {}
for key, value in resources.items():
if value:
# Default resources are set on the template. Only
# modify via arguments if users give a non-default value.
script_arguments[key] = value
if self.git_token_name:
creds = find(self._client.credentials.list(),
name=self.git_token_name,
type='Custom')
if len(creds) > 1:
raise ValueError("Unique credential with name '{}' for "
"remote git hosting service not found!"
.format(self.git_token_name))
script_arguments['GIT_CRED'] = creds[0].id
script_arguments.update(args or {})
container = self._client.scripts.post_custom(
from_template_id=template_id,
name=job_name,
arguments=script_arguments,
notifications=self.notifications)
log.info('Created custom script %s.', container.id)
run = self._client.scripts.post_custom_runs(container.id)
log.debug('Started job %s, run %s.', container.id, run.id)
train_kwargs = {}
if self.train_result_ is not None:
train_kwargs = {'train_job_id': self.train_result_.job_id,
'train_run_id': self.train_result_.run_id}
fut = ModelFuture(
container.id,
run.id,
client=self._client,
polling_interval=polling_interval,
poll_on_creation=False,
**train_kwargs)
return fut, container, run
@property
@_check_fit_initiated
def state(self):
return self.train_result_.state
@property
@_check_fit_initiated
def estimator(self):
return self.train_result_.estimator
[docs] @_check_fit_initiated
def predict(self, df=None, csv_path=None,
table_name=None, database_name=None,
manifest=None, file_id=None, sql_where=None, sql_limit=None,
primary_key=SENTINEL, output_table=None, output_db=None,
if_exists='fail', n_jobs=None, polling_interval=None,
cpu=None, memory=None, disk_space=None,
dvs_to_predict=None):
"""Make predictions on a trained model
Provide input through one of
a :class:`~pandas.DataFrame` (``df``),
a local CSV (``csv_path``),
a Civis Table (``table_name`` and ``database_name``),
a Civis File containing a CSV (``file_id``), or
a Civis File containing a manifest file (``manifest``).
A "manifest file" is JSON which specifies the location of
many shards of the data to be used for prediction.
A manifest file is the output of a Civis
export job with ``force_multifile=True`` set,
e.g. from :func:`civis.io.civis_to_multifile_csv`.
Large Civis Tables (provided using ``table_name``)
will automatically be exported to manifest files.
Prediction outputs will always be stored as gzipped
CSVs in one or more Civis Files. You can find a list of
File ID numbers for output files at the "output_file_ids"
key in the metadata returned by the prediction job.
Provide an ``output_table`` (and optionally an ``output_db``,
if it's different from ``database_name``) to copy these
predictions into a Civis Table.
Parameters
----------
df : pd.DataFrame, optional
A :class:`~pandas.DataFrame` of data for prediction.
The :class:`~pandas.DataFrame` will be uploaded to a Civis file so
that CivisML can access it.
Note that the index of the :class:`~pandas.DataFrame` will be
ignored -- use ``df.reset_index()`` if you want your
index column to be included with the data passed to CivisML.
NB: You must install ``feather-format`` if your
:class:`~pandas.DataFrame` contains :class:`~pandas.Categorical`
columns, to ensure that CivisML preserves data types.
csv_path : str, optional
The location of a CSV of data on the local disk.
It will be uploaded to a Civis file.
table_name : str, optional
The qualified name of the table containing your data
database_name : str, optional
Name of the database holding the
data, e.g., 'My Redshift Cluster'.
manifest : int, optional
ID for a manifest file stored as a Civis file.
(Note: if the manifest is not a Civis Platform-specific manifest,
like the one returned from :func:`civis.io.civis_to_multfile_csv`,
this must be used in conjunction with table_name and database_name
due to the need for column discovery via Redshift.)
file_id : int, optional
If the data are a CSV stored in a Civis file,
provide the integer file ID.
sql_where : str, optional
A SQL WHERE clause used to scope the rows to be predicted
sql_limit : int, optional
SQL LIMIT clause to restrict the size of the prediction set
primary_key : str, optional
Primary key of the prediction table. Defaults to
the primary key of the training data. Use ``None`` to
indicate that the prediction data don't have a
primary key column.
output_table: str, optional
The table in which to put the predictions.
output_db : str, optional
Database of the output table. Defaults to the database
of the input table.
if_exists : {'fail', 'append', 'drop', 'truncate'}
Action to take if the prediction table already exists.
n_jobs : int, optional
Number of concurrent Platform jobs to use
for multi-file / large table prediction. Defaults to
`None`, which allows CivisML to dynamically calculate an
appropriate number of workers to use (in general, as many as
possible without using all resources in the cluster).
polling_interval : float, optional
Check for job completion every this number of seconds.
Do not set if using the notifications endpoint.
cpu : int, optional
CPU shares requested by the user for a single job.
memory : int, optional
RAM requested by the user for a single job.
disk_space : float, optional
disk space requested by the user for a single job.
dvs_to_predict : list of str, optional
If this is a multi-output model, you may list a subset of
dependent variables for which you wish to generate predictions.
This list must be a subset of the original `dependent_variable`
input. The scores for the returned subset will be identical to
the scores which those outputs would have had if all outputs
were written, but ignoring some of the model's outputs will
let predictions complete faster and use less disk space.
The default is to produce scores for all DVs.
Returns
-------
:class:`~civis.ml.ModelFuture`
"""
self.train_result_.result() # Blocks and raises training errors
if ((table_name is None or database_name is None) and
file_id is None and df is None and csv_path is None and
manifest is None):
raise ValueError('Provide a source of data.')
if sum((bool(table_name and database_name) or (manifest is not None),
bool(file_id), df is not None, csv_path is not None)) > 1:
raise ValueError('Provide a single source of data.')
if df is not None:
file_id = _stash_local_dataframe(df, self.predict_template_id,
client=self._client)
elif csv_path:
file_id = _stash_local_file(csv_path, client=self._client)
if primary_key is SENTINEL:
primary_key = self.primary_key
predict_args = {'TRAIN_JOB': self.train_result_.job_id,
'TRAIN_RUN': self.train_result_.run_id,
'PRIMARY_KEY': primary_key,
'IF_EXISTS': if_exists}
if output_table:
predict_args['OUTPUT_TABLE'] = output_table
if output_db:
if self.predict_template_id == 7021:
# v0 jobs used a different database parameter
predict_args['OUTPUT_DB'] = output_db
else:
output_db_id = self._client.get_database_id(output_db)
predict_args['OUTPUT_DB'] = {'database': output_db_id}
if manifest:
predict_args['MANIFEST'] = manifest
if sql_where:
predict_args['WHERESQL'] = sql_where
if sql_limit:
predict_args['LIMITSQL'] = sql_limit
if n_jobs:
predict_args['N_JOBS'] = n_jobs
if dvs_to_predict:
if isinstance(dvs_to_predict, str):
dvs_to_predict = [dvs_to_predict]
if self.predict_template_id > 10583:
# This feature was added in v2.2; 10583 is the v2.1 template
predict_args['TARGET_COLUMN'] = ' '.join(dvs_to_predict)
if self.predict_template_id >= 9969:
if cpu:
predict_args['CPU'] = cpu
if memory:
predict_args['MEMORY'] = memory
if disk_space:
predict_args['DISK_SPACE'] = disk_space
name = self.model_name + ' Predict' if self.model_name else None
result, container, run = self._create_custom_run(
self.predict_template_id,
job_name=name,
table_name=table_name,
database_name=database_name,
file_id=file_id,
args=predict_args,
polling_interval=polling_interval)
return result