Source code for civis.ml._model

"""Run CivisML jobs and retrieve the results
"""
import builtins
from builtins import super
from collections import namedtuple
import json
import logging
import os
import shutil
import six
import tempfile
import threading
import warnings
from concurrent import futures
from functools import wraps

import joblib
try:
    from sklearn.base import BaseEstimator
    HAS_SKLEARN = True
except ImportError:
    HAS_SKLEARN = False

from civis import APIClient, find, find_one, __version__
from civis._utils import camel_to_snake
from civis.base import CivisAPIError, CivisJobFailure
from civis.compat import FileNotFoundError, TemporaryDirectory
import civis.io as cio
from civis.futures import ContainerFuture

__all__ = ['ModelFuture', 'ModelError', 'ModelPipeline']
log = logging.getLogger(__name__)

# sentinel value for default primary key value
SENTINEL = namedtuple('Sentinel', [])()

# Map training template to prediction template so that we
# always use a compatible version for predictions.
_PRED_TEMPLATES = {11219: 11220,  # v2.2
                   11221: 11220,  # v2.2 registration
                   10582: 10583,  # v2.1
                   9968: 9969,    # v2.0
                   9112: 9113,    # v1.1
                   8387: 8388,    # v1.0
                   7020: 7021,    # v0.5
                   }
_CIVISML_TEMPLATE = None  # CivisML training template to use
REGISTRATION_TEMPLATES = [11221,  # v2.2
                          ]


class ModelError(RuntimeError):
    def __init__(self, msg, estimator=None, metadata=None):
        self.metadata = metadata
        self.estimator = estimator
        super().__init__(msg)


def _check_fit_initiated(method):
    """Makes sure that the ModelPipeline's been trained"""
    @wraps(method)
    def wrapper(*args, **kwargs):
        self = args[0]
        if not self.train_result_:
            raise ValueError("This model hasn't been trained yet.")
        return method(*args, **kwargs)
    return wrapper


def _block_and_handle_missing(method):
    """For ModelFuture file-retrieving property methods.
    Block until completion and attempt to retrieve result.
    Raise exception only if the result isn't found.
    """
    @wraps(method)
    def wrapper(self):
        futures.wait((self,))  # Block until done
        try:
            return method(self)
        except FileNotFoundError:
            # We get here if the modeling job failed to produce
            # any output and we don't have metadata.
            if self.exception():
                six.raise_from(self.exception(), None)
            else:
                raise
    return wrapper


def _stash_local_dataframe(df, template_id, client=None):
    """Store data in a temporary Civis File and return the file ID"""
    # Standard dataframe indexes do not have a "levels" attribute,
    # but multiindexes do. Checking for this attribute means we don't
    # need to import pandas to do error handling here.
    if getattr(getattr(df, "index", None), "levels", None) is not None:
        raise TypeError("CivisML does not support multi-indexed data frames. "
                        "Try calling `.reset_index` on your data to convert "
                        "it into a CivisML-friendly format.")
    try:
        if template_id > 9969:
            return _stash_dataframe_as_feather(df, client)
        else:
            return _stash_dataframe_as_csv(df, client)
    except (ImportError, AttributeError) as exc:
        if (df.dtypes == 'category').any():
            # The original exception should tell users if they need
            # to upgrade pandas (an AttributeError)
            # # or if they need to install "feather-format" (ImportError).
            six.raise_from(ValueError(
                'Categorical columns can only be handled with pandas '
                'version >= 0.20 and `feather-format` installed.'),
                exc)
        return _stash_dataframe_as_csv(df, client)


def _stash_dataframe_as_feather(df, client):
    civis_fname = 'modelpipeline_data.feather'
    with TemporaryDirectory() as tdir:
        path = os.path.join(tdir, civis_fname)
        df.to_feather(path)
        file_id = cio.file_to_civis(path, name=civis_fname, client=client)
    return file_id


def _stash_dataframe_as_csv(df, client):
    civis_fname = 'modelpipeline_data.csv'
    if six.PY3:
        txt = six.StringIO()
    else:
        txt = six.BytesIO()
    df.to_csv(txt, encoding='utf-8', index=False)
    txt.flush()
    txt.seek(0)
    file_id = cio.file_to_civis(txt, name=civis_fname, client=client)

    return file_id


def _stash_local_file(csv_path, client=None):
    """Store data in a temporary Civis File and return the file ID"""
    civis_fname = 'modelpipeline_data.csv'
    with open(csv_path) as _fin:
        file_id = cio.file_to_civis(_fin, name=civis_fname, client=client)

    return file_id


def _decode_train_run(train_job_id, train_run_id, client):
    """Determine correct run ID for use for a given training job ID"""
    try:
        return int(train_run_id)
    except ValueError:
        container = client.scripts.get_containers(int(train_job_id))
        if train_run_id == 'active':
            train_run_id = container.arguments.get('ACTIVE_BUILD', find_one(
                container.params, name='ACTIVE_BUILD'))['default']

        if train_run_id == 'latest':
            return container.last_run.id

        try:
            return int(train_run_id)
        except Exception as exc:
            msg = ('Please provide valid train_run_id! Needs to be '
                   'integer corresponding to a training run ID '
                   'or one of "active" or "latest".')
            six.raise_from(ValueError(msg), exc)


def _retrieve_file(fname, job_id, run_id, local_dir, client=None):
    """Download a Civis file using a reference on a previous run"""
    file_id = cio.file_id_from_run_output(fname, job_id, run_id, client=client)
    fpath = os.path.join(local_dir, fname)
    # fname may contain a path
    output_dir = os.path.dirname(fpath)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    with open(fpath, 'wb') as down_file:
        cio.civis_to_file(file_id, down_file, client=client)
    return fpath


def _load_table_from_outputs(job_id, run_id, filename, client=None,
                             **table_kwargs):
    """Load a table from a run output directly into a ``DataFrame``"""
    client = APIClient() if client is None else client
    file_id = cio.file_id_from_run_output(filename, job_id, run_id,
                                          client=client, regex=True)
    return cio.file_to_dataframe(file_id, client=client, **table_kwargs)


def _load_estimator(job_id, run_id, filename='estimator.pkl', client=None):
    """Load a joblib-serialized Estimator from run outputs"""
    try:
        tempdir = tempfile.mkdtemp()
        path = _retrieve_file(filename, job_id, run_id, tempdir, client=client)
        obj = joblib.load(path)
    finally:
        shutil.rmtree(tempdir)
    return obj


def _exception_from_logs(exc, job_id, run_id, client, nlog=15):
    """Create an exception if the log has a recognizable error

    Search "error" emits in the last ``n_log`` lines.
    This function presently recognizes the following errors:

    - MemoryError
    """
    logs = client.scripts.list_containers_runs_logs(job_id, run_id, limit=nlog)

    # Check for memory errors
    msgs = [l['message'] for l in logs if l['level'] == 'error']
    mem_err = [m for m in msgs if m.startswith('Process ran out of its')]
    if mem_err:
        exc = MemoryError(mem_err[0])
    else:
        # Unknown error; return logs to the user as a sort of traceback
        all_logs = '\n'.join([l['message'] for l in logs])
        if isinstance(exc, CivisJobFailure):
            exc.error_message = all_logs + '\n' + exc.error_message
        else:
            exc = CivisJobFailure(all_logs)
    return exc


def _parse_warning(warn_str):
    """Reverse-engineer a warning string

    Parameters
    ----------
    warn_str : string

    Returns
    -------
    (str, Warning, str, int)
        message, category, filename, lineno
    """
    tokens = warn_str.rstrip('\n').split(" ")

    # The first token is
    # "[filename]:[lineno]:"
    filename, lineno, _ = tokens[0].split(':')

    # The second token is
    # "[category name]:"
    category = getattr(builtins, tokens[1][:-1], RuntimeWarning)

    message = " ".join(tokens[2:])

    return message, category, filename, int(lineno)


def _show_civisml_warnings(warn_list):
    """Re-raise warnings recorded during a CivisML run

    Parameters
    ----------
    warn_list : list of str
        A list of warnings generated during a CivisML run
    """
    for warn_str in warn_list:
        try:
            warnings.warn_explicit(*_parse_warning(warn_str))
        except Exception:  # NOQA
            warn_str = "Remote warning from CivisML:\n" + warn_str
            warnings.warn(warn_str, RuntimeWarning)


[docs]class ModelFuture(ContainerFuture): """Encapsulates asynchronous execution of a CivisML job This object knows where to find modeling outputs from CivisML jobs. All data attributes are lazily retrieved and block on job completion. This object can be pickled, but it does not store the state of the attached :class:`~civis.APIClient` object. An unpickled ModelFuture will use the API key from the user's environment. Parameters ---------- job_id : int ID of the modeling job run_id : int ID of the modeling run train_job_id : int, optional If not provided, this object is assumed to encapsulate a training job, and ``train_job_id`` will equal ``job_id``. train_run_id : int, optional If not provided, this object is assumed to encapsulate a training run, and ``train_run_id`` will equal ``run_id``. polling_interval : int or float, optional The number of seconds between API requests to check whether a result is ready. The default intelligently switches between a short interval if ``pubnub`` is not available and a long interval for ``pubnub`` backup if that library is installed. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. poll_on_creation : bool, optional If ``True`` (the default), it will poll upon calling ``result()`` the first time. If ``False``, it will wait the number of seconds specified in `polling_interval` from object creation before polling. Attributes ---------- metadata : dict, blocking The metadata associated with this modeling job metrics : dict, blocking Validation metrics from this job's training validation_metadata : dict, blocking Metadata from this modeling job's validation run train_metadata : dict, blocking Metadata from this modeling job's training run (will be identical to `metadata` if this is a training run) estimator : :class:`sklearn.pipeline.Pipeline`, blocking The fitted scikit-learn Pipeline resulting from this model run table : :class:`pandas.DataFrame`, blocking The table output from this modeling job: out-of-sample predictions on the training set for a training job, or a table of predictions for a prediction job. If the prediction job was split into multiple files (this happens automatically for large tables), this attribute will provide only predictions for the first file. state : str The current state of the Civis Platform run job_id : int run_id : int train_job_id : int Container ID for the training job -- identical to ``job_id`` if this is a training job. train_run_id : int As ``train_job_id`` but for runs is_training : bool True if this ``ModelFuture`` corresponds to a train-validate job. Methods ------- cancel() Cancels the corresponding Platform job before completion succeeded() (Non-blocking) Is the job a success? failed() (Non-blocking) Did the job fail? cancelled() (Non-blocking) Was the job cancelled? running() (Non-blocking) Is the job still running? done() (Non-blocking) Is the job finished? result() (Blocking) Return the final status of the Civis Platform job. See Also -------- civis.futures.CivisFuture civis.futures.ContainerFuture concurrent.futures.Future """ def __init__(self, job_id, run_id, train_job_id=None, train_run_id=None, polling_interval=None, client=None, poll_on_creation=True): super().__init__(job_id, run_id, polling_interval=polling_interval, client=client, poll_on_creation=poll_on_creation) if train_job_id and train_run_id: self.is_training = False self.train_job_id = train_job_id self.train_run_id = train_run_id else: self.is_training = True self.train_job_id = self.job_id self.train_run_id = self.run_id self._metadata, self._val_metadata = None, None self._train_data, self._train_data_fname = None, None self._train_metadata = None self._table, self._estimator = None, None self._exception_handled = False self.add_done_callback(self._set_model_exception) @staticmethod def _set_model_exception(fut): """Callback: On job completion, check the metadata. If it indicates an exception, replace the generic ``CivisJobFailure`` by a more informative ``ModelError``. """ # Prevent infinite recursion: this function calls `set_exception`, # which triggers callbacks (i.e. re-calls this function). if fut._exception_handled: return else: fut._exception_handled = True try: meta = fut.metadata if fut.is_training and meta['run']['status'] == 'succeeded': # if training job and job succeeded, check validation job meta = fut.validation_metadata if meta is not None and meta['run']['status'] == 'exception': try: # This will fail if the user doesn't have joblib installed est = fut.estimator except Exception: # NOQA est = None fut.set_exception( ModelError('Model run failed with stack trace:\n' '{}'.format(meta['run']['stack_trace']), est, meta)) except (FileNotFoundError, CivisJobFailure) as exc: # If there's no metadata file # (we get FileNotFound or CivisJobFailure), # check the tail of the log for a clearer exception. exc = _exception_from_logs(exc, fut.job_id, fut.run_id, fut.client) fut.set_exception(exc) except futures.CancelledError: # We don't need to change the exception if the run was cancelled pass except KeyError: # KeyErrors always represent a bug in the modeling code, # but showing the resulting KeyError can be confusing and # mask the real error. warnings.warn("Received malformed metadata from Civis Platform. " "Something went wrong with job execution.") def __getstate__(self): state = self.__dict__.copy() del state['_polling_thread'] del state['client'] del state['poller'] del state['_condition'] if '_pubnub' in state: state['_pubnub'] = True # Replace with a boolean flag state['_done_callbacks'] = [] state['_self_polling_executor'] = None return state def __setstate__(self, state): self.__dict__ = state self._condition = threading.Condition() self.client = APIClient() self.poller = self.client.scripts.get_containers_runs self._begin_tracking() self.add_done_callback(self._set_model_exception) @property def state(self): state = self._civis_state if state == 'succeeded': state = self.metadata['run']['status'] return state @property def table_fname(self): return 'predictions.csv' @property def metadata_fname(self): return 'model_info.json' @property def train_data_fname(self): if self._train_data_fname is None: self._train_data_fname = os.path.basename(self.training_metadata .get('run') .get('configuration') .get('data') .get('location')) return self._train_data_fname @property def train_data(self): if self._train_data is None: try: self._train_data = _load_table_from_outputs( self.train_job_id, self.train_run_id, self.train_data_fname, client=self.client) except CivisAPIError as err: if err.status_code == 404: msg = 'There is no training data stored for this job!' six.raise_from(ValueError(msg), err) else: raise return self._train_data def _table_primary_key(self): # metadata path to input parameters is different # for training and prediction if self.is_training: pkey = self.metadata[ 'run']['configuration']['data']['primary_key'] else: pkey = self.metadata[ 'jobs'][0]['run']['configuration']['data']['primary_key'] return pkey @property def table(self): self.result() # Block and raise errors if any if self._table is None: # An index column will only be present if primary key is if self._table_primary_key() is None: index_col = False else: index_col = 0 if self.is_training: try: # Training jobs only have one output table, the OOS scores self._table = _load_table_from_outputs(self.job_id, self.run_id, self.table_fname, index_col=index_col, client=self.client) except FileNotFoundError: # Just pass here, because we want the table to stay None # if it does not exist pass else: # Prediction jobs may have many output tables. output_ids = self.metadata['output_file_ids'] if len(output_ids) > 1: print('This job output {} files. Retrieving only the ' 'first. Find the full list at `metadata' '["output_file_ids"]`.'.format(len(output_ids))) self._table = cio.file_to_dataframe(output_ids[0], client=self.client, index_col=index_col) return self._table @property @_block_and_handle_missing def metadata(self): if self._metadata is None: fid = cio.file_id_from_run_output('model_info.json', self.job_id, self.run_id, client=self.client) self._metadata = cio.file_to_json(fid, client=self.client) _show_civisml_warnings(self._metadata.get('warnings', [])) return self._metadata @property @_block_and_handle_missing def estimator(self): if self._estimator is None: self._estimator = _load_estimator(self.train_job_id, self.train_run_id, client=self.client) return self._estimator @property @_block_and_handle_missing def validation_metadata(self): if self._val_metadata is None: try: fid = cio.file_id_from_run_output('metrics.json', self.train_job_id, self.train_run_id, client=self.client) except FileNotFoundError: # Use an empty dictionary to indicate that # we've already checked for metadata. self._val_metadata = {} else: self._val_metadata = cio.file_to_json(fid, client=self.client) if not self._val_metadata: # Convert an empty dictionary to None return None else: return self._val_metadata @property def metrics(self): if self.validation_metadata: return self.validation_metadata['metrics'] else: return None @property @_block_and_handle_missing def training_metadata(self): if self._train_metadata is None: fid = cio.file_id_from_run_output('model_info.json', self.train_job_id, self.train_run_id, client=self.client) self._train_metadata = cio.file_to_json(fid, client=self.client) return self._train_metadata
[docs]class ModelPipeline: """Interface for scikit-learn modeling in the Civis Platform Each ModelPipeline corresponds to a scikit-learn :class:`~sklearn.pipeline.Pipeline` which will run in Civis Platform. Note that this object can be safely pickled and unpickled, but it does not store the state of any attached :class:`~civis.APIClient` object. An unpickled ModelPipeline will use the API key from the user's environment. Parameters ---------- model : string or Estimator Either the name of a pre-defined model (e.g. "sparse_logistic" or "gradient_boosting_classifier") or else a pre-existing Estimator object. dependent_variable : string or List[str] The dependent variable of the training dataset. For a multi-target problem, this should be a list of column names of dependent variables. Nulls in a single dependent variable will automatically be dropped. primary_key : string, optional The unique ID (primary key) of the training dataset. This will be used to index the out-of-sample scores. parameters : dict, optional Specify parameters for the final stage estimator in a predefined model, e.g. ``{'C': 2}`` for a "sparse_logistic" model. cross_validation_parameters : dict or string, optional Options for cross validation. For grid search, supply a parameter grid as a dictionary, e.g., ``{{'n_estimators': [100, 200, 500], 'learning_rate': [0.01, 0.1], 'max_depth': [2, 3]}}``. For hyperband, pass the string "hyperband". model_name : string, optional The prefix of the Platform modeling jobs. It will have " Train" or " Predict" added to become the Script title. calibration : {None, "sigmoid", "isotonic"} If not None, calibrate output probabilities with the selected method. Valid only with classification models. excluded_columns : array, optional A list of columns which will be considered ineligible to be independent variables. client : :class:`~civis.APIClient`, optional If not provided, an :class:`~civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. cpu_requested : int, optional Number of CPU shares requested in the Civis Platform for training jobs. 1024 shares = 1 CPU. memory_requested : int, optional Memory requested from Civis Platform for training jobs, in MiB disk_requested : float, optional Disk space requested on Civis Platform for training jobs, in GB notifications : dict See :func:`~civis.resources._resources.Scripts.post_custom` for further documentation about email and URL notification. dependencies : array, optional List of packages to install from PyPI or git repository (e.g., Github or Bitbucket). If a private repo is specified, please include a ``git_token_name`` argument as well (see below). Make sure to pin dependencies to a specific version, since dependencies will be reinstalled during every training and predict job. git_token_name : str, optional Name of remote git API token stored in Civis Platform as the password field in a custom platform credential. Used only when installing private git repositories. verbose : bool, optional If True, supply debug outputs in Platform logs and make prediction child jobs visible. etl : Estimator, optional Custom ETL estimator which overrides the default ETL, and is run before training and validation. Methods ------- train() Train the model on data in Civis Platform; outputs :class:`~civis.ml.ModelFuture` predict() Make predictions on new data; outputs :class:`~civis.ml.ModelFuture` from_existing() Class method; use to create a :class:`~civis.ml.ModelPipeline` from an existing model training run Attributes ---------- estimator : :class:`~sklearn.pipeline.Pipeline` The trained scikit-learn Pipeline train_result_ : :class:`~civis.ml.ModelFuture` :class:`~civis.ml.ModelFuture` encapsulating this model's training run state : str Status of the training job (non-blocking) Examples -------- >>> from civis.ml import ModelPipeline >>> model = ModelPipeline('gradient_boosting_classifier', 'depvar', ... primary_key='voterbase_id') >>> train = model.train(table_name='schema.survey_data', ... fit_params={'sample_weight': 'survey_weight'}, ... database_name='My Redshift Cluster', ... oos_scores='scratch.survey_depvar_oos_scores') >>> train <ModelFuture at 0x11be7ae10 state=queued> >>> train.running() True >>> train.done() False >>> df = train.table # Read OOS scores from its Civis File. Blocking. >>> meta = train.metadata # Metadata from training run >>> train.metrics['roc_auc'] 0.88425 >>> pred = model.predict(table_name='schema.demographics_table ', ... database_name='My Redshift Cluster', ... output_table='schema.predicted_survey_response', ... if_exists='drop') >>> df_pred = pred.table # Blocks until finished # Modify the parameters of the base estimator in a default model: >>> model = ModelPipeline('sparse_logistic', 'depvar', ... primary_key='voterbase_id', ... parameters={'C': 2}) # Grid search over hyperparameters in the base estimator: >>> model = ModelPipeline('sparse_logistic', 'depvar', ... primary_key='voterbase_id', ... cross_validation_parameters={'C': [0.1, 1, 10]}) See Also -------- civis.ml.ModelFuture """ def _get_template_ids(self, client): """Determine which version of CivisML to use. Select the most recent template to which the user has access. Used for internal or limited releases of new CivisML versions. """ global _CIVISML_TEMPLATE if _CIVISML_TEMPLATE is None: for t_id in sorted(_PRED_TEMPLATES)[::-1]: if t_id in REGISTRATION_TEMPLATES: continue try: # Check that we can access the template client.templates.get_scripts(id=t_id) client.templates.get_scripts(id=_PRED_TEMPLATES[t_id]) except CivisAPIError: continue else: # Store the template ID so we don't need to repeat # these API calls in this session. _CIVISML_TEMPLATE = t_id break else: raise RuntimeError("Unable to find a CivisML template.") return _CIVISML_TEMPLATE, _PRED_TEMPLATES[_CIVISML_TEMPLATE] def __init__(self, model, dependent_variable, primary_key=None, parameters=None, cross_validation_parameters=None, model_name=None, calibration=None, excluded_columns=None, client=None, cpu_requested=None, memory_requested=None, disk_requested=None, notifications=None, dependencies=None, git_token_name=None, verbose=False, etl=None): self.model = model self._input_model = model # In case we need to modify the input if isinstance(dependent_variable, six.string_types): # Standardize the dependent variable as a list. dependent_variable = [dependent_variable] self.dependent_variable = dependent_variable # optional but common parameters self.primary_key = primary_key self.parameters = parameters or {} self.cv_params = cross_validation_parameters or {} self.model_name = model_name # None lets Platform use template name self.excluded_columns = excluded_columns self.calibration = calibration self.job_resources = {'REQUIRED_CPU': cpu_requested, 'REQUIRED_MEMORY': memory_requested, 'REQUIRED_DISK_SPACE': disk_requested} self.notifications = notifications or {} self.dependencies = dependencies self.git_token_name = git_token_name self.verbose = verbose if client is None: client = APIClient() self._client = client self.train_result_ = None template_ids = self._get_template_ids(self._client) self.train_template_id, self.predict_template_id = template_ids self.etl = etl if self.train_template_id < 9968 and self.etl is not None: # This is a pre-v2.0 CivisML template raise NotImplementedError("The etl argument is not implemented" " in this version of CivisML.") def __getstate__(self): state = self.__dict__.copy() del state['_client'] return state def __setstate__(self, state): self.__dict__ = state self._client = APIClient() template_ids = self._get_template_ids(self._client) self.train_template_id, self.predict_template_id = template_ids
[docs] @classmethod def register_pretrained_model(cls, model, dependent_variable=None, features=None, primary_key=None, model_name=None, dependencies=None, git_token_name=None, skip_model_check=False, verbose=False, client=None): """Use a fitted scikit-learn model with CivisML scoring Use this function to set up your own fitted scikit-learn-compatible Estimator object for scoring with CivisML. This function will upload your model to Civis Platform and store enough metadata about it that you can subsequently use it with a CivisML scoring job. The only required input is the model itself, but you are strongly recommended to also provide a list of feature names. Without a list of feature names, CivisML will have to assume that your scoring table contains only the features needed for scoring (perhaps also with a primary key column), in all in the correct order. Parameters ---------- model : sklearn.base.BaseEstimator or int The model object. This must be a fitted scikit-learn compatible Estimator object, or else the integer Civis File ID of a pickle or joblib-serialized file which stores such an object. dependent_variable : string or List[str], optional The dependent variable of the training dataset. For a multi-target problem, this should be a list of column names of dependent variables. features : string or List[str], optional A list of column names of features which were used for training. These will be used to ensure that tables input for prediction have the correct features in the correct order. primary_key : string, optional The unique ID (primary key) of the scoring dataset model_name : string, optional The name of the Platform registration job. It will have " Predict" added to become the Script title for predictions. dependencies : array, optional List of packages to install from PyPI or git repository (e.g., GitHub or Bitbucket). If a private repo is specified, please include a ``git_token_name`` argument as well (see below). Make sure to pin dependencies to a specific version, since dependencies will be reinstalled during every predict job. git_token_name : str, optional Name of remote git API token stored in Civis Platform as the password field in a custom platform credential. Used only when installing private git repositories. skip_model_check : bool, optional If you're sure that your model will work with CivisML, but it will fail the comprehensive verification, set this to True. verbose : bool, optional If True, supply debug outputs in Platform logs and make prediction child jobs visible. client : :class:`~civis.APIClient`, optional If not provided, an :class:`~civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. Returns ------- :class:`~civis.ml.ModelPipeline` Examples -------- This example assumes that you already have training data ``X`` and ``y``, where ``X`` is a :class:`~pandas.DataFrame`. >>> from civis.ml import ModelPipeline >>> from sklearn.linear_model import Lasso >>> est = Lasso().fit(X, y) >>> model = ModelPipeline.register_pretrained_model( ... est, 'concrete', features=X.columns) >>> model.predict(table_name='my.table', database_name='my-db') """ client = client or APIClient() if isinstance(dependent_variable, six.string_types): dependent_variable = [dependent_variable] if isinstance(features, six.string_types): features = [features] if isinstance(dependencies, six.string_types): dependencies = [dependencies] if not model_name: model_name = ("Pretrained {} model for " "CivisML".format(model.__class__.__name__)) model_name = model_name[:255] # Max size is 255 characters if isinstance(model, (int, float, six.string_types)): model_file_id = int(model) else: try: tempdir = tempfile.mkdtemp() fout = os.path.join(tempdir, 'model_for_civisml.pkl') joblib.dump(model, fout, compress=3) with open(fout, 'rb') as _fout: # NB: Using the name "estimator.pkl" means that # CivisML doesn't need to copy this input to a file # with a different name. model_file_id = cio.file_to_civis(_fout, 'estimator.pkl', client=client) finally: shutil.rmtree(tempdir) args = {'MODEL_FILE_ID': str(model_file_id), 'SKIP_MODEL_CHECK': skip_model_check, 'DEBUG': verbose} if dependent_variable is not None: args['TARGET_COLUMN'] = ' '.join(dependent_variable) if features is not None: args['FEATURE_COLUMNS'] = ' '.join(features) if dependencies is not None: args['DEPENDENCIES'] = ' '.join(dependencies) if git_token_name: creds = find(client.credentials.list(), name=git_token_name, type='Custom') if len(creds) > 1: raise ValueError("Unique credential with name '{}' for " "remote git hosting service not found!" .format(git_token_name)) args['GIT_CRED'] = creds[0].id template_id = max(REGISTRATION_TEMPLATES) container = client.scripts.post_custom( from_template_id=template_id, name=model_name, arguments=args) log.info('Created custom script %s.', container.id) run = client.scripts.post_custom_runs(container.id) log.debug('Started job %s, run %s.', container.id, run.id) fut = ModelFuture(container.id, run.id, client=client, poll_on_creation=False) fut.result() log.info('Model registration complete.') mp = ModelPipeline.from_existing(fut.job_id, fut.run_id, client) mp.primary_key = primary_key return mp
[docs] @classmethod def from_existing(cls, train_job_id, train_run_id='latest', client=None): """Create a :class:`ModelPipeline` object from existing model IDs Parameters ---------- train_job_id : int The ID of the CivisML job in the Civis Platform train_run_id : int or string, optional Location of the model run, either * an explicit run ID, * "latest" : The most recent run * "active" : The run designated by the training job's "active build" parameter client : :class:`~civis.APIClient`, optional If not provided, an :class:`~civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. Returns ------- :class:`~civis.ml.ModelPipeline` A :class:`~civis.ml.ModelPipeline` which refers to a previously-trained model Examples -------- >>> from civis.ml import ModelPipeline >>> model = ModelPipeline.from_existing(job_id) >>> model.train_result_.metrics['roc_auc'] 0.843 """ train_job_id = int(train_job_id) # Convert np.int to int if client is None: client = APIClient() train_run_id = _decode_train_run(train_job_id, train_run_id, client) try: fut = ModelFuture(train_job_id, train_run_id, client=client) container = client.scripts.get_containers(train_job_id) except CivisAPIError as api_err: if api_err.status_code == 404: msg = ('There is no Civis Platform job with ' 'script ID {} and run ID {}!'.format(train_job_id, train_run_id)) six.raise_from(ValueError(msg), api_err) raise args = container.arguments # Older templates used "WORKFLOW" instead of "MODEL" model = args.get('MODEL', args.get('WORKFLOW')) dependent_variable = args['TARGET_COLUMN'].split() primary_key = args.get('PRIMARY_KEY') parameters = json.loads(args.get('PARAMS', "{}")) cross_validation_parameters = json.loads(args.get('CVPARAMS', "{}")) calibration = args.get('CALIBRATION') excluded_columns = args.get('EXCLUDE_COLS', None) if excluded_columns: excluded_columns = excluded_columns.split() cpu_requested = args.get('REQUIRED_CPU') memory_requested = args.get('REQUIRED_MEMORY') disk_requested = args.get('REQUIRED_DISK_SPACE') name = container.name if name.endswith(' Train'): # Strip object-applied suffix name = name[:-len(' Train')] notifications = {camel_to_snake(key): val for key, val in container.notifications.items()} dependencies = args.get('DEPENDENCIES', None) if dependencies: dependencies = dependencies.split() git_token_name = args.get('GIT_CRED', None) if git_token_name: git_token_name = client.credentials.get(git_token_name).name klass = cls(model=model, dependent_variable=dependent_variable, primary_key=primary_key, model_name=name, parameters=parameters, cross_validation_parameters=cross_validation_parameters, calibration=calibration, excluded_columns=excluded_columns, client=client, cpu_requested=cpu_requested, disk_requested=disk_requested, memory_requested=memory_requested, notifications=notifications, dependencies=dependencies, git_token_name=git_token_name, verbose=args.get('DEBUG', False)) klass.train_result_ = fut # Set prediction template corresponding to training template template_id = int(container['from_template_id']) p_id = _PRED_TEMPLATES.get(template_id) if p_id is None: warnings.warn('Model %s was trained with a newer version of ' 'CivisML than is available in the API client ' 'version %s. Please update your API client version. ' 'Attempting to use an older version of the ' 'prediction code. Prediction will either fail ' 'immediately or succeed.' % (train_job_id, __version__), RuntimeWarning) p_id = max([v for k, v in _PRED_TEMPLATES.items() if k not in REGISTRATION_TEMPLATES]) klass.predict_template_id = p_id return klass
[docs] def train(self, df=None, csv_path=None, table_name=None, database_name=None, file_id=None, sql_where=None, sql_limit=None, oos_scores=None, oos_scores_db=None, if_exists='fail', fit_params=None, polling_interval=None, validation_data='train', n_jobs=None): """Start a Civis Platform job to train your model Provide input through one of a :class:`~pandas.DataFrame` (``df``), a local CSV (``csv_path``), a Civis Table (``table_name`` and ``database_name``), or a Civis File containing a CSV (``file_id``). Model outputs will always contain out-of-sample scores (accessible through :attr:`ModelFuture.table` on this function's output), and you may chose to store these out-of-sample scores in a Civis Table with the ``oos_scores``, ``oos_scores_db``, and ``if_exists`` parameters. Parameters ---------- df : pd.DataFrame, optional A :class:`~pandas.DataFrame` of training data. The :class:`~pandas.DataFrame` will be uploaded to a Civis file so that CivisML can access it. Note that the index of the :class:`~pandas.DataFrame` will be ignored -- use ``df.reset_index()`` if you want your index column to be included with the data passed to CivisML. NB: You must install ``feather-format`` if your :class:`~pandas.DataFrame` contains :class:`~pandas.Categorical` columns, to ensure that CivisML preserves data types. csv_path : str, optional The location of a CSV of data on the local disk. It will be uploaded to a Civis file. table_name : str, optional The qualified name of the table containing the training set from which to build the model. database_name : str, optional Name of the database holding the training set table used to build the model. E.g., 'My Cluster Name'. file_id : int, optional If the training data are stored in a Civis file, provide the integer file ID. sql_where : str, optional A SQL WHERE clause used to scope the rows of the training set (used for table input only) sql_limit : int, optional SQL LIMIT clause for querying the training set (used for table input only) oos_scores : str, optional If provided, store out-of-sample predictions on training set data to this Redshift "schema.tablename". oos_scores_db : str, optional If not provided, store OOS predictions in the same database which holds the training data. if_exists : {'fail', 'append', 'drop', 'truncate'} Action to take if the out-of-sample prediction table already exists. fit_params: Dict[str, str] Mapping from parameter names in the model's ``fit`` method to the column names which hold the data, e.g. ``{'sample_weight': 'survey_weight_column'}``. polling_interval : float, optional Check for job completion every this number of seconds. Do not set if using the notifications endpoint. validation_data : str, optional Source for validation data. There are currently two options: `'train'` (the default), which cross-validates over training data for validation; and `'skip'`, which skips the validation step. n_jobs : int, optional Number of jobs to use for training and validation. Defaults to `None`, which allows CivisML to dynamically calculate an appropriate number of workers to use (in general, as many as possible without using all resources in the cluster). Increase n_jobs to parallelize over many hyperparameter combinations in grid search/hyperband, or decrease to use fewer computational resources at once. Returns ------- :class:`~civis.ml.ModelFuture` """ if ((table_name is None or database_name is None) and file_id is None and df is None and csv_path is None): raise ValueError('Provide a source of data.') if sum((bool(table_name and database_name), bool(file_id), df is not None, csv_path is not None)) > 1: raise ValueError('Provide a single source of data.') if df is not None: file_id = _stash_local_dataframe(df, self.train_template_id, client=self._client) elif csv_path: file_id = _stash_local_file(csv_path, client=self._client) train_args = {'TARGET_COLUMN': ' '.join(self.dependent_variable), 'PRIMARY_KEY': self.primary_key, 'PARAMS': json.dumps(self.parameters), 'CVPARAMS': json.dumps(self.cv_params), 'CALIBRATION': self.calibration, 'IF_EXISTS': if_exists} if oos_scores: train_args['OOSTABLE'] = oos_scores if oos_scores_db: oos_db_id = self._client.get_database_id(oos_scores_db) train_args['OOSDB'] = {'database': oos_db_id} if sql_where: train_args['WHERESQL'] = sql_where if sql_limit: train_args['LIMITSQL'] = sql_limit if self.excluded_columns: train_args['EXCLUDE_COLS'] = ' '.join(self.excluded_columns) if fit_params: train_args['FIT_PARAMS'] = json.dumps(fit_params) if self.dependencies: train_args['DEPENDENCIES'] = ' '.join(self.dependencies) if self.train_template_id >= 9968: if validation_data: train_args['VALIDATION_DATA'] = validation_data if n_jobs: train_args['N_JOBS'] = n_jobs if HAS_SKLEARN and isinstance(self.model, BaseEstimator): try: tempdir = tempfile.mkdtemp() fout = os.path.join(tempdir, 'estimator.pkl') joblib.dump(self.model, fout, compress=3) with open(fout, 'rb') as _fout: n = self.model_name if self.model_name else "CivisML" estimator_file_id = cio.file_to_civis( _fout, 'Estimator for ' + n, client=self._client) self._input_model = self.model # Keep the estimator self.model = str(estimator_file_id) finally: shutil.rmtree(tempdir) train_args['MODEL'] = self.model if HAS_SKLEARN and self.train_template_id >= 9968: if isinstance(self.etl, BaseEstimator): try: tempdir = tempfile.mkdtemp() fout = os.path.join(tempdir, 'ETL.pkl') joblib.dump(self.etl, fout, compress=3) with open(fout, 'rb') as _fout: etl_file_id = cio.file_to_civis( _fout, 'ETL Estimator', client=self._client) train_args['ETL'] = str(etl_file_id) finally: shutil.rmtree(tempdir) name = self.model_name + ' Train' if self.model_name else None # Clear the existing training result so we can make a new one. self.train_result_ = None result, container, run = self._create_custom_run( self.train_template_id, job_name=name, table_name=table_name, database_name=database_name, file_id=file_id, args=train_args, resources=self.job_resources, polling_interval=polling_interval) self.train_result_ = result return result
def _create_custom_run(self, template_id, job_name=None, table_name=None, database_name=None, file_id=None, args=None, resources=None, polling_interval=None): # Handle int-like but non-Python-integer types such as np.int64 file_id = int(file_id) if file_id is not None else file_id script_arguments = {'TABLE_NAME': table_name, 'CIVIS_FILE_ID': file_id, 'DEBUG': self.verbose} if database_name: if template_id < 8000: # v0 jobs used a different database parameter script_arguments['DB_NAME'] = database_name else: db_id = self._client.get_database_id(database_name) script_arguments['DB'] = {'database': db_id} resources = resources or {} for key, value in resources.items(): if value: # Default resources are set on the template. Only # modify via arguments if users give a non-default value. script_arguments[key] = value if self.git_token_name: creds = find(self._client.credentials.list(), name=self.git_token_name, type='Custom') if len(creds) > 1: raise ValueError("Unique credential with name '{}' for " "remote git hosting service not found!" .format(self.git_token_name)) script_arguments['GIT_CRED'] = creds[0].id script_arguments.update(args or {}) container = self._client.scripts.post_custom( from_template_id=template_id, name=job_name, arguments=script_arguments, notifications=self.notifications) log.info('Created custom script %s.', container.id) run = self._client.scripts.post_custom_runs(container.id) log.debug('Started job %s, run %s.', container.id, run.id) train_kwargs = {} if self.train_result_ is not None: train_kwargs = {'train_job_id': self.train_result_.job_id, 'train_run_id': self.train_result_.run_id} fut = ModelFuture( container.id, run.id, client=self._client, polling_interval=polling_interval, poll_on_creation=False, **train_kwargs) return fut, container, run @property @_check_fit_initiated def state(self): return self.train_result_.state @property @_check_fit_initiated def estimator(self): return self.train_result_.estimator
[docs] @_check_fit_initiated def predict(self, df=None, csv_path=None, table_name=None, database_name=None, manifest=None, file_id=None, sql_where=None, sql_limit=None, primary_key=SENTINEL, output_table=None, output_db=None, if_exists='fail', n_jobs=None, polling_interval=None, cpu=None, memory=None, disk_space=None, dvs_to_predict=None): """Make predictions on a trained model Provide input through one of a :class:`~pandas.DataFrame` (``df``), a local CSV (``csv_path``), a Civis Table (``table_name`` and ``database_name``), a Civis File containing a CSV (``file_id``), or a Civis File containing a manifest file (``manifest``). A "manifest file" is JSON which specifies the location of many shards of the data to be used for prediction. A manifest file is the output of a Civis export job with ``force_multifile=True`` set, e.g. from :func:`civis.io.civis_to_multifile_csv`. Large Civis Tables (provided using ``table_name``) will automatically be exported to manifest files. Prediction outputs will always be stored as gzipped CSVs in one or more Civis Files. You can find a list of File ID numbers for output files at the "output_file_ids" key in the metadata returned by the prediction job. Provide an ``output_table`` (and optionally an ``output_db``, if it's different from ``database_name``) to copy these predictions into a Civis Table. Parameters ---------- df : pd.DataFrame, optional A :class:`~pandas.DataFrame` of data for prediction. The :class:`~pandas.DataFrame` will be uploaded to a Civis file so that CivisML can access it. Note that the index of the :class:`~pandas.DataFrame` will be ignored -- use ``df.reset_index()`` if you want your index column to be included with the data passed to CivisML. NB: You must install ``feather-format`` if your :class:`~pandas.DataFrame` contains :class:`~pandas.Categorical` columns, to ensure that CivisML preserves data types. csv_path : str, optional The location of a CSV of data on the local disk. It will be uploaded to a Civis file. table_name : str, optional The qualified name of the table containing your data database_name : str, optional Name of the database holding the data, e.g., 'My Redshift Cluster'. manifest : int, optional ID for a manifest file stored as a Civis file. (Note: if the manifest is not a Civis Platform-specific manifest, like the one returned from :func:`civis.io.civis_to_multfile_csv`, this must be used in conjunction with table_name and database_name due to the need for column discovery via Redshift.) file_id : int, optional If the data are a CSV stored in a Civis file, provide the integer file ID. sql_where : str, optional A SQL WHERE clause used to scope the rows to be predicted sql_limit : int, optional SQL LIMIT clause to restrict the size of the prediction set primary_key : str, optional Primary key of the prediction table. Defaults to the primary key of the training data. Use ``None`` to indicate that the prediction data don't have a primary key column. output_table: str, optional The table in which to put the predictions. output_db : str, optional Database of the output table. Defaults to the database of the input table. if_exists : {'fail', 'append', 'drop', 'truncate'} Action to take if the prediction table already exists. n_jobs : int, optional Number of concurrent Platform jobs to use for multi-file / large table prediction. Defaults to `None`, which allows CivisML to dynamically calculate an appropriate number of workers to use (in general, as many as possible without using all resources in the cluster). polling_interval : float, optional Check for job completion every this number of seconds. Do not set if using the notifications endpoint. cpu : int, optional CPU shares requested by the user for a single job. memory : int, optional RAM requested by the user for a single job. disk_space : float, optional disk space requested by the user for a single job. dvs_to_predict : list of str, optional If this is a multi-output model, you may list a subset of dependent variables for which you wish to generate predictions. This list must be a subset of the original `dependent_variable` input. The scores for the returned subset will be identical to the scores which those outputs would have had if all outputs were written, but ignoring some of the model's outputs will let predictions complete faster and use less disk space. The default is to produce scores for all DVs. Returns ------- :class:`~civis.ml.ModelFuture` """ self.train_result_.result() # Blocks and raises training errors if ((table_name is None or database_name is None) and file_id is None and df is None and csv_path is None and manifest is None): raise ValueError('Provide a source of data.') if sum((bool(table_name and database_name) or (manifest is not None), bool(file_id), df is not None, csv_path is not None)) > 1: raise ValueError('Provide a single source of data.') if df is not None: file_id = _stash_local_dataframe(df, self.predict_template_id, client=self._client) elif csv_path: file_id = _stash_local_file(csv_path, client=self._client) if primary_key is SENTINEL: primary_key = self.primary_key predict_args = {'TRAIN_JOB': self.train_result_.job_id, 'TRAIN_RUN': self.train_result_.run_id, 'PRIMARY_KEY': primary_key, 'IF_EXISTS': if_exists} if output_table: predict_args['OUTPUT_TABLE'] = output_table if output_db: if self.predict_template_id == 7021: # v0 jobs used a different database parameter predict_args['OUTPUT_DB'] = output_db else: output_db_id = self._client.get_database_id(output_db) predict_args['OUTPUT_DB'] = {'database': output_db_id} if manifest: predict_args['MANIFEST'] = manifest if sql_where: predict_args['WHERESQL'] = sql_where if sql_limit: predict_args['LIMITSQL'] = sql_limit if n_jobs: predict_args['N_JOBS'] = n_jobs if dvs_to_predict: if isinstance(dvs_to_predict, six.string_types): dvs_to_predict = [dvs_to_predict] if self.predict_template_id > 10583: # This feature was added in v2.2; 10583 is the v2.1 template predict_args['TARGET_COLUMN'] = ' '.join(dvs_to_predict) if self.predict_template_id >= 9969: if cpu: predict_args['CPU'] = cpu if memory: predict_args['MEMORY'] = memory if disk_space: predict_args['DISK_SPACE'] = disk_space name = self.model_name + ' Predict' if self.model_name else None result, container, run = self._create_custom_run( self.predict_template_id, job_name=name, table_name=table_name, database_name=database_name, file_id=file_id, args=predict_args, polling_interval=polling_interval) return result