"""Run CivisML jobs and retrieve the results
"""
from builtins import super
from collections import namedtuple
import io
import json
import logging
import os
import shutil
import six
import tempfile
import threading
import warnings
from concurrent import futures
from functools import wraps
import joblib
try:
from sklearn.base import BaseEstimator
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
from civis import APIClient, find, find_one, __version__
from civis._utils import camel_to_snake
from civis.base import CivisAPIError, CivisJobFailure
from civis.compat import FileNotFoundError
import civis.io as cio
from civis.futures import ContainerFuture
__all__ = ['ModelFuture', 'ModelError', 'ModelPipeline']
log = logging.getLogger(__name__)
# sentinel value for default primary key value
SENTINEL = namedtuple('Sentinel', [])()
# Map training template to prediction template so that we
# always use a compatible version for predictions.
_PRED_TEMPLATES = {9968: 9969, # v2.0
9112: 9113, # v1.1
8387: 9113, # v1.0
7020: 7021, # v0.5
}
class ModelError(RuntimeError):
def __init__(self, msg, estimator=None, metadata=None):
self.metadata = metadata
self.estimator = estimator
super().__init__(msg)
def _check_fit_initiated(method):
"""Makes sure that the ModelPipeline's been trained"""
@wraps(method)
def wrapper(*args, **kwargs):
self = args[0]
if not self.train_result_:
raise ValueError("This model hasn't been trained yet.")
return method(*args, **kwargs)
return wrapper
def _block_and_handle_missing(method):
"""For ModelFuture file-retrieving property methods.
Block until completion and attempt to retrieve result.
Raise exception only if the result isn't found.
"""
@wraps(method)
def wrapper(self):
futures.wait((self,)) # Block until done
try:
return method(self)
except FileNotFoundError:
# We get here if the modeling job failed to produce
# any output and we don't have metadata.
if self.exception():
six.raise_from(self.exception(), None)
else:
raise
return wrapper
def _stash_local_dataframe(df, client=None):
"""Store data in a temporary Civis File and return the file ID"""
# Standard dataframe indexes do not have a "levels" attribute,
# but multiindexes do. Checking for this attribute means we don't
# need to import pandas to do error handling here.
if getattr(getattr(df, "index", None), "levels", None) is not None:
raise TypeError("CivisML does not support multi-indexed data frames. "
"Try calling `.reset_index` on your data to convert "
"it into a CivisML-friendly format.")
civis_fname = 'modelpipeline_data.csv'
buf = six.BytesIO()
if six.PY3:
txt = io.TextIOWrapper(buf, encoding='utf-8')
else:
txt = buf
df.to_csv(txt, encoding='utf-8', index=False)
txt.flush()
buf.seek(0)
file_id = cio.file_to_civis(buf, name=civis_fname, client=client)
return file_id
def _stash_local_file(csv_path, client=None):
"""Store data in a temporary Civis File and return the file ID"""
civis_fname = 'modelpipeline_data.csv'
with open(csv_path) as _fin:
file_id = cio.file_to_civis(_fin, name=civis_fname, client=client)
return file_id
def _decode_train_run(train_job_id, train_run_id, client):
"""Determine correct run ID for use for a given training job ID"""
try:
return int(train_run_id)
except ValueError:
container = client.scripts.get_containers(int(train_job_id))
if train_run_id == 'active':
train_run_id = container.arguments.get('ACTIVE_BUILD', find_one(
container.params, name='ACTIVE_BUILD'))['default']
if train_run_id == 'latest':
return container.last_run.id
try:
return int(train_run_id)
except Exception as exc:
msg = ('Please provide valid train_run_id! Needs to be '
'integer corresponding to a training run ID '
'or one of "active" or "latest".')
six.raise_from(ValueError(msg), exc)
def _retrieve_file(fname, job_id, run_id, local_dir, client=None):
"""Download a Civis file using a reference on a previous run"""
file_id = cio.file_id_from_run_output(fname, job_id, run_id, client=client)
fpath = os.path.join(local_dir, fname)
# fname may contain a path
output_dir = os.path.dirname(fpath)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(fpath, 'wb') as down_file:
cio.civis_to_file(file_id, down_file, client=client)
return fpath
def _load_table_from_outputs(job_id, run_id, filename, client=None,
**table_kwargs):
"""Load a table from a run output directly into a ``DataFrame``"""
client = APIClient(resources='all') if client is None else client
file_id = cio.file_id_from_run_output(filename, job_id, run_id,
client=client, regex=True)
return cio.file_to_dataframe(file_id, client=client, **table_kwargs)
def _load_estimator(job_id, run_id, filename='estimator.pkl', client=None):
"""Load a joblib-serialized Estimator from run outputs"""
try:
tempdir = tempfile.mkdtemp()
path = _retrieve_file(filename, job_id, run_id, tempdir, client=client)
obj = joblib.load(path)
finally:
shutil.rmtree(tempdir)
return obj
def _exception_from_logs(exc, job_id, run_id, client, nlog=15):
"""Create an exception if the log has a recognizable error
Search "error" emits in the last ``n_log`` lines.
This function presently recognizes the following errors:
- MemoryError
"""
logs = client.scripts.list_containers_runs_logs(job_id, run_id, limit=nlog)
# Check for memory errors
msgs = [l['message'] for l in logs if l['level'] == 'error']
mem_err = [m for m in msgs if m.startswith('Process ran out of its')]
if mem_err:
exc = MemoryError(mem_err[0])
else:
# Unknown error; return logs to the user as a sort of traceback
all_logs = '\n'.join([l['message'] for l in logs])
if isinstance(exc, CivisJobFailure):
exc.error_message = all_logs + '\n' + exc.error_message
else:
exc = CivisJobFailure(all_logs)
return exc
[docs]class ModelFuture(ContainerFuture):
"""Encapsulates asynchronous execution of a CivisML job
This object knows where to find modeling outputs
from CivisML jobs. All data attributes are
lazily retrieved and block on job completion.
This object can be pickled, but it does not store the state
of the attached :class:`~civis.APIClient` object. An unpickled
ModelFuture will use the API key from the user's environment.
Parameters
----------
job_id : int
ID of the modeling job
run_id : int
ID of the modeling run
train_job_id : int, optional
If not provided, this object is assumed to encapsulate a training
job, and ``train_job_id`` will equal ``job_id``.
train_run_id : int, optional
If not provided, this object is assumed to encapsulate a training
run, and ``train_run_id`` will equal ``run_id``.
polling_interval : int or float, optional
The number of seconds between API requests to check whether a result
is ready. The default intelligently switches between a short
interval if ``pubnub`` is not available and a long interval
for ``pubnub`` backup if that library is installed.
client : :class:`civis.APIClient`, optional
If not provided, an :class:`civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
poll_on_creation : bool, optional
If ``True`` (the default), it will poll upon calling ``result()`` the
first time. If ``False``, it will wait the number of seconds specified
in `polling_interval` from object creation before polling.
Attributes
----------
metadata : dict, blocking
The metadata associated with this modeling job
metrics : dict, blocking
Validation metrics from this job's training
validation_metadata : dict, blocking
Metadata from this modeling job's validation run
train_metadata : dict, blocking
Metadata from this modeling job's training run
(will be identical to `metadata` if this is a training run)
estimator : :class:`sklearn.pipeline.Pipeline`, blocking
The fitted scikit-learn Pipeline resulting from this model run
table : :class:`pandas.DataFrame`, blocking
The table output from this modeling job: out-of-sample
predictions on the training set for a training job, or
a table of predictions for a prediction job.
If the prediction job was split into multiple files
(this happens automatically for large tables),
this attribute will provide only predictions for the first file.
state : str
The current state of the Civis Platform run
job_id : int
run_id : int
train_job_id : int
Container ID for the training job -- identical to ``job_id``
if this is a training job.
train_run_id : int
As ``train_job_id`` but for runs
is_training : bool
True if this ``ModelFuture`` corresponds to a train-validate job.
Methods
-------
cancel()
Cancels the corresponding Platform job before completion
succeeded()
(Non-blocking) Is the job a success?
failed()
(Non-blocking) Did the job fail?
cancelled()
(Non-blocking) Was the job cancelled?
running()
(Non-blocking) Is the job still running?
done()
(Non-blocking) Is the job finished?
result()
(Blocking) Return the final status of the Civis Platform job.
See Also
--------
civis.futures.CivisFuture
civis.futures.ContainerFuture
concurrent.futures.Future
"""
def __init__(self, job_id, run_id, train_job_id=None, train_run_id=None,
polling_interval=None, client=None, poll_on_creation=True):
super().__init__(job_id, run_id,
polling_interval=polling_interval,
client=client,
poll_on_creation=poll_on_creation)
if train_job_id and train_run_id:
self.is_training = False
self.train_job_id = train_job_id
self.train_run_id = train_run_id
else:
self.is_training = True
self.train_job_id = self.job_id
self.train_run_id = self.run_id
self._metadata, self._val_metadata = None, None
self._train_data, self._train_data_fname = None, None
self._train_metadata = None
self._table, self._estimator = None, None
self._exception_handled = False
self.add_done_callback(self._set_model_exception)
@staticmethod
def _set_model_exception(fut):
"""Callback: On job completion, check the metadata.
If it indicates an exception, replace the generic
``CivisJobFailure`` by a more informative ``ModelError``.
"""
# Prevent infinite recursion: this function calls `set_exception`,
# which triggers callbacks (i.e. re-calls this function).
if fut._exception_handled:
return
else:
fut._exception_handled = True
try:
meta = fut.metadata
if fut.is_training and meta['run']['status'] == 'succeeded':
# if training job and job succeeded, check validation job
meta = fut.validation_metadata
if meta is not None and meta['run']['status'] == 'exception':
try:
# This will fail if the user doesn't have joblib installed
est = fut.estimator
except Exception: # NOQA
est = None
fut.set_exception(
ModelError('Model run failed with stack trace:\n'
'{}'.format(meta['run']['stack_trace']),
est, meta))
except (FileNotFoundError, CivisJobFailure) as exc:
# If there's no metadata file
# (we get FileNotFound or CivisJobFailure),
# check the tail of the log for a clearer exception.
exc = _exception_from_logs(exc, fut.job_id, fut.run_id, fut.client)
fut.set_exception(exc)
except futures.CancelledError:
# We don't need to change the exception if the run was cancelled
pass
except KeyError:
# KeyErrors always represent a bug in the modeling code,
# but showing the resulting KeyError can be confusing and
# mask the real error.
warnings.warn("Received malformed metadata from Civis Platform. "
"Something went wrong with job execution.")
def __getstate__(self):
state = self.__dict__.copy()
del state['_polling_thread']
del state['client']
del state['poller']
del state['_condition']
if '_pubnub' in state:
state['_pubnub'] = True # Replace with a boolean flag
state['_done_callbacks'] = []
state['_self_polling_executor'] = None
return state
def __setstate__(self, state):
self.__dict__ = state
self._condition = threading.Condition()
self.client = APIClient(resources='all')
self.poller = self.client.scripts.get_containers_runs
self._begin_tracking()
self.add_done_callback(self._set_model_exception)
@property
def state(self):
state = self._civis_state
if state == 'succeeded':
state = self.metadata['run']['status']
return state
@property
def table_fname(self):
return 'predictions.csv'
@property
def metadata_fname(self):
return 'model_info.json'
@property
def train_data_fname(self):
if self._train_data_fname is None:
self._train_data_fname = os.path.basename(self.training_metadata
.get('run')
.get('configuration')
.get('data')
.get('location'))
return self._train_data_fname
@property
def train_data(self):
if self._train_data is None:
try:
self._train_data = _load_table_from_outputs(
self.train_job_id,
self.train_run_id,
self.train_data_fname,
client=self.client)
except CivisAPIError as err:
if err.status_code == 404:
msg = 'There is no training data stored for this job!'
six.raise_from(ValueError(msg), err)
else:
raise
return self._train_data
@property
def table(self):
self.result() # Block and raise errors if any
if self._table is None:
if self.is_training:
try:
# Training jobs only have one output table, the OOS scores
self._table = _load_table_from_outputs(self.job_id,
self.run_id,
self.table_fname,
index_col=0,
client=self.client)
except FileNotFoundError:
# Just pass here, because we want the table to stay None
# if it does not exist
pass
else:
# Prediction jobs may have many output tables.
output_ids = self.metadata['output_file_ids']
if len(output_ids) > 1:
print('This job output {} files. Retrieving only the '
'first. Find the full list at `metadata'
'["output_file_ids"]`.'.format(len(output_ids)))
self._table = cio.file_to_dataframe(output_ids[0],
client=self.client,
index_col=0)
return self._table
@property
@_block_and_handle_missing
def metadata(self):
if self._metadata is None:
fid = cio.file_id_from_run_output('model_info.json', self.job_id,
self.run_id, client=self.client)
self._metadata = cio.file_to_json(fid, client=self.client)
return self._metadata
@property
@_block_and_handle_missing
def estimator(self):
if self._estimator is None:
self._estimator = _load_estimator(self.train_job_id,
self.train_run_id,
client=self.client)
return self._estimator
@property
@_block_and_handle_missing
def validation_metadata(self):
if self._val_metadata is None:
fid = cio.file_id_from_run_output('metrics.json',
self.train_job_id,
self.train_run_id,
client=self.client)
self._val_metadata = cio.file_to_json(fid, client=self.client)
return self._val_metadata
@property
def metrics(self):
if self.validation_metadata is not None:
return self.validation_metadata['metrics']
else:
return None
@property
@_block_and_handle_missing
def training_metadata(self):
if self._train_metadata is None:
fid = cio.file_id_from_run_output('model_info.json',
self.train_job_id,
self.train_run_id,
client=self.client)
self._train_metadata = cio.file_to_json(fid, client=self.client)
return self._train_metadata
[docs]class ModelPipeline:
"""Interface for scikit-learn modeling in the Civis Platform
Each ModelPipeline corresponds to a scikit-learn
:class:`~sklearn.pipeline.Pipeline` which will run in Civis Platform.
Note that this object can be safely pickled and unpickled, but it
does not store the state of any attached :class:`~civis.APIClient` object.
An unpickled ModelPipeline will use the API key from the user's
environment.
Parameters
----------
model : string or Estimator
Either the name of a pre-defined model
(e.g. "sparse_logistic" or "gradient_boosting_classifier")
or else a pre-existing Estimator object.
dependent_variable : string or List[str]
The dependent variable of the training dataset.
For a multi-target problem, this should be a list of
column names of dependent variables.
primary_key : string, optional
The unique ID (primary key) of the training dataset.
This will be used to index the out-of-sample scores.
parameters : dict, optional
Specify parameters for the final stage estimator in a
predefined model, e.g. ``{'C': 2}`` for a "sparse_logistic"
model.
cross_validation_parameters : dict, optional
Cross validation parameter grid for learner parameters, e.g.
``{{'n_estimators': [100, 200, 500], 'learning_rate': [0.01, 0.1],
'max_depth': [2, 3]}}``.
model_name : string, optional
The prefix of the Platform modeling jobs. It will have
" Train" or " Predict" added to become the Script title.
calibration : {None, "sigmoid", "isotonic"}
If not None, calibrate output probabilities with the selected method.
Valid only with classification models.
excluded_columns : array, optional
A list of columns which will be considered ineligible to be
independent variables.
client : :class:`~civis.APIClient`, optional
If not provided, an :class:`~civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
cpu_requested : int, optional
Number of CPU shares requested in the Civis Platform for
training jobs. 1024 shares = 1 CPU.
memory_requested : int, optional
Memory requested from Civis Platform for training jobs, in MiB
disk_requested : float, optional
Disk space requested on Civis Platform for training jobs, in GB
notifications : dict
See :func:`~civis.resources._resources.Scripts.post_custom` for
further documentation about email and URL notification.
dependencies : array, optional
List of packages to install from PyPI or git repository (i.e., Github
or Bitbucket). If a private repo is specified, please include a
``git_token_name`` argument as well (see below). Make sure to pin
dependencies to a specific version, since dependecies will be
reinstalled during every training and predict job.
git_token_name : str, optional
Name of remote git API token stored in Civis Platform as the password
field in a custom platform credential. Used only when installing
private git repositories.
verbose : bool, optional
If True, supply debug outputs in Platform logs and make
prediction child jobs visible.
etl : Estimator, optional
Custom ETL estimator which overrides the default ETL, and
is run before training and validation.
Methods
-------
train()
Train the model on data in Civis Platform; outputs
:class:`~civis.ml.ModelFuture`
predict()
Make predictions on new data; outputs :class:`~civis.ml.ModelFuture`
from_existing()
Class method; use to create a :class:`~civis.ml.ModelPipeline`
from an existing model training run
Attributes
----------
estimator : :class:`~sklearn.pipeline.Pipeline`
The trained scikit-learn Pipeline
train_result_ : :class:`~civis.ml.ModelFuture`
:class:`~civis.ml.ModelFuture` encapsulating this model's training run
state : str
Status of the training job (non-blocking)
Examples
--------
>>> from civis.ml import ModelPipeline
>>> model = ModelPipeline('gradient_boosting_classifier', 'depvar',
... primary_key='voterbase_id')
>>> train = model.train(table_name='schema.survey_data',
... fit_params={'sample_weight': 'survey_weight'},
... database_name='My Redshift Cluster',
... oos_scores='scratch.survey_depvar_oos_scores')
>>> train
<ModelFuture at 0x11be7ae10 state=queued>
>>> train.running()
True
>>> train.done()
False
>>> df = train.table # Read OOS scores from its Civis File. Blocking.
>>> meta = train.metadata # Metadata from training run
>>> train.metrics['roc_auc']
0.88425
>>> pred = model.predict(table_name='schema.demographics_table ',
... database_name='My Redshift Cluster',
... output_table='schema.predicted_survey_response',
... if_exists='drop',
... n_jobs=50)
>>> df_pred = pred.table # Blocks until finished
# Modify the parameters of the base estimator in a default model:
>>> model = ModelPipeline('sparse_logistic', 'depvar',
... primary_key='voterbase_id',
... parameters={'C': 2})
# Grid search over hyperparameters in the base estimator:
>>> model = ModelPipeline('sparse_logistic', 'depvar',
... primary_key='voterbase_id',
... cross_validation_parameters={'C': [0.1, 1, 10]})
See Also
--------
civis.ml.ModelFuture
"""
# These are the v2.0 templates
train_template_id = 9968
predict_template_id = 9969
# These are the v1.1 templates
_train_template_id_fallback = 9112
_predict_template_id_fallback = 9113
def _set_template_version(self, client):
"""Determine which version of CivisML to use. If the user
has access to the newest templates, use them, otherwise
fall back to the previous version. Used for internal or limited
releases of new CivisML versions."""
if '_NEWEST_CIVISML_VERSION' not in globals():
global _NEWEST_CIVISML_VERSION
try:
newest_train = max(_PRED_TEMPLATES.keys())
# Check that we can access the newest templates
client.templates.get_scripts(id=newest_train)
client.templates.get_scripts(id=_PRED_TEMPLATES[newest_train])
except CivisAPIError:
_NEWEST_CIVISML_VERSION = False
else:
_NEWEST_CIVISML_VERSION = True
def __init__(self, model, dependent_variable,
primary_key=None, parameters=None,
cross_validation_parameters=None, model_name=None,
calibration=None, excluded_columns=None, client=None,
cpu_requested=None, memory_requested=None,
disk_requested=None, notifications=None,
dependencies=None, git_token_name=None, verbose=False,
etl=None):
self.model = model
self._input_model = model # In case we need to modify the input
if isinstance(dependent_variable, str):
# Standardize the dependent variable as a list.
dependent_variable = [dependent_variable]
self.dependent_variable = dependent_variable
# optional but common parameters
self.primary_key = primary_key
self.parameters = parameters or {}
self.cv_params = cross_validation_parameters or {}
self.model_name = model_name # None lets Platform use template name
self.excluded_columns = excluded_columns
self.calibration = calibration
self.job_resources = {'REQUIRED_CPU': cpu_requested,
'REQUIRED_MEMORY': memory_requested,
'REQUIRED_DISK_SPACE': disk_requested}
self.notifications = notifications or {}
self.dependencies = dependencies
self.git_token_name = git_token_name
self.verbose = verbose
if client is None:
client = APIClient(resources='all')
self._client = client
self.train_result_ = None
self._set_template_version(client)
if _NEWEST_CIVISML_VERSION:
self.etl = etl
elif not _NEWEST_CIVISML_VERSION and etl is not None:
raise NotImplementedError("The etl argument is not implemented"
" in this version of CivisML.")
else:
# fall back to previous version templates
self.train_template_id = self._train_template_id_fallback
self.predict_template_id = self._predict_template_id_fallback
def __getstate__(self):
state = self.__dict__.copy()
del state['_client']
return state
def __setstate__(self, state):
self.__dict__ = state
self._client = APIClient(resources='all')
self._set_template_version(self._client)
@classmethod
[docs] def from_existing(cls, train_job_id, train_run_id='latest', client=None):
"""Create a :class:`ModelPipeline` object from existing model IDs
Parameters
----------
train_job_id : int
The ID of the CivisML job in the Civis Platform
train_run_id : int or string, optional
Location of the model run, either
* an explicit run ID,
* "latest" : The most recent run
* "active" : The run designated by the training job's
"active build" parameter
client : :class:`~civis.APIClient`, optional
If not provided, an :class:`~civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
Returns
-------
:class:`~civis.ml.ModelPipeline`
A :class:`~civis.ml.ModelPipeline` which refers to
a previously-trained model
Examples
--------
>>> from civis.ml import ModelPipeline
>>> model = ModelPipeline.from_existing(job_id)
>>> model.train_result_.metrics['roc_auc']
0.843
"""
train_job_id = int(train_job_id) # Convert np.int to int
if client is None:
client = APIClient(resources='all')
train_run_id = _decode_train_run(train_job_id, train_run_id, client)
try:
fut = ModelFuture(train_job_id, train_run_id, client=client)
container = client.scripts.get_containers(train_job_id)
except CivisAPIError as api_err:
if api_err.status_code == 404:
msg = ('There is no Civis Platform job with '
'script ID {} and run ID {}!'.format(train_job_id,
train_run_id))
six.raise_from(ValueError(msg), api_err)
raise
args = container.arguments
# Older templates used "WORKFLOW" instead of "MODEL"
model = args.get('MODEL', args.get('WORKFLOW'))
dependent_variable = args['TARGET_COLUMN'].split()
primary_key = args.get('PRIMARY_KEY')
parameters = json.loads(args.get('PARAMS', "{}"))
cross_validation_parameters = json.loads(args.get('CVPARAMS', "{}"))
calibration = args.get('CALIBRATION')
excluded_columns = args.get('EXCLUDE_COLS', None)
if excluded_columns:
excluded_columns = excluded_columns.split()
cpu_requested = args.get('REQUIRED_CPU')
memory_requested = args.get('REQUIRED_MEMORY')
disk_requested = args.get('REQUIRED_DISK_SPACE')
name = container.name
if name.endswith(' Train'):
# Strip object-applied suffix
name = name[:-len(' Train')]
notifications = {camel_to_snake(key): val for key, val
in container.notifications.items()}
dependencies = args.get('DEPENDENCIES', None)
if dependencies:
dependencies = dependencies.split()
git_token_name = args.get('GIT_CRED', None)
if git_token_name:
git_token_name = client.credentials.get(git_token_name).name
klass = cls(model=model,
dependent_variable=dependent_variable,
primary_key=primary_key,
model_name=name,
parameters=parameters,
cross_validation_parameters=cross_validation_parameters,
calibration=calibration,
excluded_columns=excluded_columns,
client=client,
cpu_requested=cpu_requested,
disk_requested=disk_requested,
memory_requested=memory_requested,
notifications=notifications,
dependencies=dependencies,
git_token_name=git_token_name,
verbose=args.get('DEBUG', False))
klass.train_result_ = fut
# Set prediction template corresponding to training template
template_id = int(container['from_template_id'])
p_id = _PRED_TEMPLATES.get(template_id)
if p_id is None:
warnings.warn('Model %s was trained with a newer version of '
'CivisML than is available in the API client '
'version %s. Please update your API client version. '
'Attempting to use an older version of the '
'prediction code. Prediction will either fail '
'immediately or succeed.'
% (train_job_id, __version__), RuntimeWarning)
p_id = max(_PRED_TEMPLATES.values())
klass.predict_template_id = p_id
return klass
[docs] def train(self, df=None, csv_path=None, table_name=None,
database_name=None, file_id=None,
sql_where=None, sql_limit=None, oos_scores=None,
oos_scores_db=None, if_exists='fail', fit_params=None,
polling_interval=None, validation_data='train', n_jobs=4):
"""Start a Civis Platform job to train your model
Provide input through one of
a :class:`~pandas.DataFrame` (``df``),
a local CSV (``csv_path``),
a Civis Table (``table_name`` and ``database_name``), or
a Civis File containing a CSV (``file_id``).
Model outputs will always contain out-of-sample scores
(accessible through :attr:`ModelFuture.table` on this function's
output), and you may chose to store these out-of-sample scores
in a Civis Table with the ``oos_scores``, ``oos_scores_db``,
and ``if_exists`` parameters.
Parameters
----------
df : pd.DataFrame, optional
A :class:`~pandas.DataFrame` of training data.
The :class:`~pandas.DataFrame` will be uploaded to a Civis file so
that CivisML can access it.
Note that the index of the :class:`~pandas.DataFrame` will be
ignored -- use ``df.reset_index()`` if you want your
index column to be included with the data passed to CivisML.
csv_path : str, optional
The location of a CSV of data on the local disk.
It will be uploaded to a Civis file.
table_name : str, optional
The qualified name of the table containing the training set from
which to build the model.
database_name : str, optional
Name of the database holding the training set table used to
build the model. E.g., 'My Cluster Name'.
file_id : int, optional
If the training data are stored in a Civis file,
provide the integer file ID.
sql_where : str, optional
A SQL WHERE clause used to scope the rows of the training set
(used for table input only)
sql_limit : int, optional
SQL LIMIT clause for querying the training set
(used for table input only)
oos_scores : str, optional
If provided, store out-of-sample predictions on
training set data to this Redshift "schema.tablename".
oos_scores_db : str, optional
If not provided, store OOS predictions in the same database
which holds the training data.
if_exists : {'fail', 'append', 'drop', 'truncate'}
Action to take if the out-of-sample prediction table
already exists.
fit_params: Dict[str, str]
Mapping from parameter names in the model's ``fit`` method
to the column names which hold the data, e.g.
``{'sample_weight': 'survey_weight_column'}``.
polling_interval : float, optional
Check for job completion every this number of seconds.
Do not set if using the notifications endpoint.
validation_data : str, optional
Source for validation data. There are currently two options:
`'train'` (the default), which cross-validates over training data
for validation; and `'skip'`, which skips the validation step.
n_jobs : int, optional
Number of jobs to use for training and validation. Defaults to
4, which allows parallelization over the 4 cross validation folds.
Increase n_jobs to parallelize over many hyperparameter
combinations in grid search/hyperband, or decrease to use fewer
computational resources at once.
Returns
-------
:class:`~civis.ml.ModelFuture`
"""
if ((table_name is None or database_name is None) and
file_id is None and df is None and csv_path is None):
raise ValueError('Provide a source of data.')
if sum((bool(table_name and database_name),
bool(file_id), df is not None, csv_path is not None)) > 1:
raise ValueError('Provide a single source of data.')
if df is not None:
file_id = _stash_local_dataframe(df, client=self._client)
elif csv_path:
file_id = _stash_local_file(csv_path, client=self._client)
train_args = {'TARGET_COLUMN': ' '.join(self.dependent_variable),
'PRIMARY_KEY': self.primary_key,
'PARAMS': json.dumps(self.parameters),
'CVPARAMS': json.dumps(self.cv_params),
'CALIBRATION': self.calibration,
'IF_EXISTS': if_exists}
if oos_scores:
train_args['OOSTABLE'] = oos_scores
if oos_scores_db:
oos_db_id = self._client.get_database_id(oos_scores_db)
train_args['OOSDB'] = {'database': oos_db_id}
if sql_where:
train_args['WHERESQL'] = sql_where
if sql_limit:
train_args['LIMITSQL'] = sql_limit
if self.excluded_columns:
train_args['EXCLUDE_COLS'] = ' '.join(self.excluded_columns)
if fit_params:
train_args['FIT_PARAMS'] = json.dumps(fit_params)
if self.dependencies:
train_args['DEPENDENCIES'] = ' '.join(self.dependencies)
if _NEWEST_CIVISML_VERSION:
if validation_data:
train_args['VALIDATION_DATA'] = validation_data
if n_jobs:
train_args['N_JOBS'] = n_jobs
if HAS_SKLEARN and isinstance(self.model, BaseEstimator):
try:
tempdir = tempfile.mkdtemp()
fout = os.path.join(tempdir, 'estimator.pkl')
joblib.dump(self.model, fout, compress=3)
with open(fout, 'rb') as _fout:
n = self.model_name if self.model_name else "CivisML"
estimator_file_id = cio.file_to_civis(
_fout, 'Estimator for ' + n, client=self._client)
self._input_model = self.model # Keep the estimator
self.model = str(estimator_file_id)
finally:
shutil.rmtree(tempdir)
train_args['MODEL'] = self.model
if HAS_SKLEARN and _NEWEST_CIVISML_VERSION:
if isinstance(self.etl, BaseEstimator):
try:
tempdir = tempfile.mkdtemp()
fout = os.path.join(tempdir, 'ETL.pkl')
joblib.dump(self.etl, fout, compress=3)
with open(fout, 'rb') as _fout:
etl_file_id = cio.file_to_civis(
_fout, 'ETL Estimator', client=self._client)
train_args['ETL'] = str(etl_file_id)
finally:
shutil.rmtree(tempdir)
name = self.model_name + ' Train' if self.model_name else None
# Clear the existing training result so we can make a new one.
self.train_result_ = None
result, container, run = self._create_custom_run(
self.train_template_id,
job_name=name,
table_name=table_name,
database_name=database_name,
file_id=file_id,
args=train_args,
resources=self.job_resources,
polling_interval=polling_interval)
self.train_result_ = result
return result
def _create_custom_run(self, template_id, job_name=None, table_name=None,
database_name=None, file_id=None, args=None,
resources=None, polling_interval=None):
# Handle int-like but non-Python-integer types such as np.int64
file_id = int(file_id) if file_id is not None else file_id
script_arguments = {'TABLE_NAME': table_name,
'CIVIS_FILE_ID': file_id,
'DEBUG': self.verbose}
if database_name:
if template_id < 8000:
# v0 jobs used a different database parameter
script_arguments['DB_NAME'] = database_name
else:
db_id = self._client.get_database_id(database_name)
script_arguments['DB'] = {'database': db_id}
resources = resources or {}
for key, value in resources.items():
if value:
# Default resources are set on the template. Only
# modify via arguments if users give a non-default value.
script_arguments[key] = value
if self.git_token_name:
creds = find(self._client.credentials.list(),
name=self.git_token_name,
type='Custom')
if len(creds) > 1:
raise ValueError("Unique credential with name '{}' for "
"remote git hosting service not found!"
.format(self.git_token_name))
script_arguments['GIT_CRED'] = creds[0].id
script_arguments.update(args or {})
container = self._client.scripts.post_custom(
from_template_id=template_id,
name=job_name,
arguments=script_arguments,
notifications=self.notifications)
log.info('Created custom script %s.', container.id)
run = self._client.scripts.post_custom_runs(container.id)
log.debug('Started job %s, run %s.', container.id, run.id)
train_kwargs = {}
if self.train_result_ is not None:
train_kwargs = {'train_job_id': self.train_result_.job_id,
'train_run_id': self.train_result_.run_id}
fut = ModelFuture(
container.id,
run.id,
client=self._client,
polling_interval=polling_interval,
poll_on_creation=False,
**train_kwargs)
return fut, container, run
@property
@_check_fit_initiated
def state(self):
return self.train_result_.state
@property
@_check_fit_initiated
def estimator(self):
return self.train_result_.estimator
@_check_fit_initiated
[docs] def predict(self, df=None, csv_path=None,
table_name=None, database_name=None,
manifest=None, file_id=None, sql_where=None, sql_limit=None,
primary_key=SENTINEL, output_table=None, output_db=None,
if_exists='fail', n_jobs=None, polling_interval=None,
cpu=None, memory=None, disk_space=None):
"""Make predictions on a trained model
Provide input through one of
a :class:`~pandas.DataFrame` (``df``),
a local CSV (``csv_path``),
a Civis Table (``table_name`` and ``database_name``),
a Civis File containing a CSV (``file_id``), or
a Civis File containing a manifest file (``manifest``).
A "manifest file" is JSON which specifies the location of
many shards of the data to be used for prediction.
A manifest file is the output of a Civis
export job with ``force_multifile=True`` set,
e.g. from :func:`civis.io.civis_to_multifile_csv`.
Large Civis Tables (provided using ``table_name``)
will automatically be exported to manifest files.
Prediction outputs will always be stored as gzipped
CSVs in one or more Civis Files. You can find a list of
File ID numbers for output files at the "output_file_ids"
key in the metadata returned by the prediction job.
Provide an ``output_table`` (and optionally an ``output_db``,
if it's different from ``database_name``) to copy these
predictions into a Civis Table.
Parameters
----------
df : pd.DataFrame, optional
A :class:`~pandas.DataFrame` of data for prediction.
The :class:`~pandas.DataFrame` will be uploaded to a Civis file so
that CivisML can access it.
Note that the index of the :class:`~pandas.DataFrame` will be
ignored -- use ``df.reset_index()`` if you want your
index column to be included with the data passed to CivisML.
csv_path : str, optional
The location of a CSV of data on the local disk.
It will be uploaded to a Civis file.
table_name : str, optional
The qualified name of the table containing your data
database_name : str, optional
Name of the database holding the
data, e.g., 'My Redshift Cluster'.
manifest : int, optional
ID for a manifest file stored as a Civis file.
(Note: if the manifest is not a Civis Platform-specific manifest,
like the one returned from :func:`civis.io.civis_to_multfile_csv`,
this must be used in conjunction with table_name and database_name
due to the need for column discovery via Redshift.)
file_id : int, optional
If the data are a CSV stored in a Civis file,
provide the integer file ID.
sql_where : str, optional
A SQL WHERE clause used to scope the rows to be predicted
sql_limit : int, optional
SQL LIMIT clause to restrict the size of the prediction set
primary_key : str, optional
Primary key of the prediction table. Defaults to
the primary key of the training data. Use ``None`` to
indicate that the prediction data don't have a
primary key column.
output_table: str, optional
The table in which to put the predictions.
output_db : str, optional
Database of the output table. Defaults to the database
of the input table.
if_exists : {'fail', 'append', 'drop', 'truncate'}
Action to take if the prediction table already exists.
n_jobs : int, optional
Number of concurrent Platform jobs to use
for multi-file / large table prediction.
polling_interval : float, optional
Check for job completion every this number of seconds.
Do not set if using the notifications endpoint.
cpu : int, optional
CPU shares requested by the user for a single job.
memory : int, optional
RAM requested by the user for a single job.
disk_space : float, optional
disk space requested by the user for a single job.
Returns
-------
:class:`~civis.ml.ModelFuture`
"""
self.train_result_.result() # Blocks and raises training errors
if ((table_name is None or database_name is None) and
file_id is None and df is None and csv_path is None and
manifest is None):
raise ValueError('Provide a source of data.')
if sum((bool(table_name and database_name) or (manifest is not None),
bool(file_id), df is not None, csv_path is not None)) > 1:
raise ValueError('Provide a single source of data.')
if df is not None:
file_id = _stash_local_dataframe(df, client=self._client)
elif csv_path:
file_id = _stash_local_file(csv_path, client=self._client)
if primary_key is SENTINEL:
primary_key = self.primary_key
predict_args = {'TRAIN_JOB': self.train_result_.job_id,
'TRAIN_RUN': self.train_result_.run_id,
'PRIMARY_KEY': primary_key,
'IF_EXISTS': if_exists}
if output_table:
predict_args['OUTPUT_TABLE'] = output_table
if output_db:
if self.predict_template_id == 7021:
# v0 jobs used a different database parameter
predict_args['OUTPUT_DB'] = output_db
else:
output_db_id = self._client.get_database_id(output_db)
predict_args['OUTPUT_DB'] = {'database': output_db_id}
if manifest:
predict_args['MANIFEST'] = manifest
if sql_where:
predict_args['WHERESQL'] = sql_where
if sql_limit:
predict_args['LIMITSQL'] = sql_limit
if n_jobs:
predict_args['N_JOBS'] = n_jobs
if _NEWEST_CIVISML_VERSION:
if cpu:
predict_args['CPU'] = cpu
if memory:
predict_args['MEMORY'] = memory
if disk_space:
predict_args['DISK_SPACE'] = disk_space
name = self.model_name + ' Predict' if self.model_name else None
result, container, run = self._create_custom_run(
self.predict_template_id,
job_name=name,
table_name=table_name,
database_name=database_name,
file_id=file_id,
args=predict_args,
polling_interval=polling_interval)
return result