Parallel Computation

Civis Platform manages a pool of cloud computing resources. You can access these resources with the tools in the civis.parallel and civis.futures modules.

Joblib backend

If you can divide your work into multiple independent chunks, each of which takes at least several minutes to run, you can reduce the time your job takes to finish by running each chunk simultaneously in Civis Platform. The Civis joblib backend is a software tool which makes it easier to run many jobs simultaneously.

Things to keep in mind when deciding if the Civis joblib backend is the right tool for your code:

  • Each function call which is parallelized with the Civis joblib backend will run in a different Civis Platform script. Creating a new script comes with some overhead. It will take between a few seconds and a few minutes for each script to start, depending on whether Civis Platform needs to provision additional resources. If you expect that each function call will complete quickly, instead consider either running them in serial or using extra processes in the same Civis Platform script.

  • Because function calls run in different scripts, function inputs and outputs must be uploaded to Civis Platform from their origin script and downloaded into their destination. If your functions take very large inputs and/or produce very large outputs, moving the data around will cause additional overhead. Consider either using a different tool or refactoring your code so that the function to be parallelized is no longer moving around large amounts of data.

  • Some open-source libraries, such as scikit-learn, use joblib to do computations in parallel. If you’re working with such a library, the Civis joblib backend provides an easy way to run these parallel computations in different Civis Platform scripts.

Joblib

joblib is an open source Python library which facilitates parallel processing in Python. While Joblib comes with its own parallel computation tools, it also allows users to define their own “back end”. The Civis Python API client takes advantage of this to let you easily run your own code in parallel through Civis Platform.

The make_backend_factory(), infer_backend_factory(), and make_backend_template_factory() functions allow you to define a “civis” parallel computation backend which will transparently distribute computation in cloud resources managed by Civis Platform.

See the joblib user guide for examples of using joblib to do parallel computation. Note that the descriptions of “memmapping” aren’t relevant to using Civis Platform as a backend, since your jobs will potentially run on different computers and can’t share memory. Using the Civis joblib backend to run jobs in parallel in the cloud looks the same as running jobs in parallel on your local computer, except that you first need to set up the “civis” backend.

How to use

Begin by defining the backend. The Civis joblib backend creates and runs Container Scripts, and the make_backend_factory() function accepts several arguments which will be passed to post_containers(). For example, you could pass a repo_http_uri or repo_ref to clone a repository from GitHub into the container which will run your function. Use the docker_image_name and docker_image_tag to select a custom Docker image for your job. You can provide a setup_cmd to run setup in bash before your function executes in Python. The default setup_cmd will run pip install . in the base directory of any repo_http_uri which you include in your backend setup. Make sure that the environment you define for your Civis backend includes all of the code which your parallel function will call.

The make_backend_factory() function will return a backend factory which should be given to the joblib.register_parallel_backend() function. For example:

>>> from joblib import register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> be_factory = make_backend_factory()
>>> register_parallel_backend('civis', be_factory)

Direct joblib to use a custom backend by entering a joblib.parallel_config() context:

>>> from joblib import parallel_config
>>> with parallel_config('civis'):
...     # Do joblib parallel computation here.

You can find more about custom joblib backends in the joblib documentation.

Note that joblib.Parallel takes both a n_jobs and pre_dispatch parameter. The Civis joblib backend doesn’t queue submitted jobs itself, so it will run pre_dispatch jobs at once.

Note

Since joblib v1.3.0, n_jobs=1 will disable the use of the specified parallel backend, and will simply run the given function with a regular for loop. n_jobs=1 would be useful for testing your code locally before running it in Civis Platform.

Note

The default value of pre_dispatch is "2*n_jobs", which will run a maximum of 2 * n_jobs jobs at once on Civis Platform. Set pre_dispatch="n_jobs" in your joblib.Parallel call to run at most n_jobs jobs.

The Civis joblib backend uses cloudpickle to transport code and data from the parent environment to Civis Platform. This means that you may parallelize dynamically-defined functions and classes, including lambda functions.

The joblib backend will automatically add environment variables called CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID, holding the values of the job and run IDs of the Civis Platform job in which you’re running the joblib backend (if any). Your functions could use these to communicate with the parent job or to recognize that they’re in a process which has been created by another Civis Platform job. However, where possible you should let the joblib backend itself transport the return value of the function it’s running back to the parent.

Infer backend parameters

If you’re writing code which will run inside a Civis Container Script, then the infer_backend_factory() function returns a backend factory with environment parameters pre-populated by inspecting the state of your container script at run time. Use infer_backend_factory() anywhere you would use make_backend_factory(), and you don’t need to specify a Docker image or GitHub repository.

Script Templates

The make_backend_template_factory() is intended for developers who are writing code which may be run by users who don’t have permissions to create new container scripts with the necessary environment.

Instead of defining and creating new container scripts with make_backend_factory(), you can use make_backend_template_factory() to launch custom scripts from a script template. To use the template factory, your backing container script must have the Civis Python client installed, and its run command must finish by calling civis_joblib_worker with no arguments. The template must accept the parameter JOBLIB_FUNC_FILE_ID. The Civis joblib backend will use this parameter to transport your remote work.

Examples

Parallel computation using the default joblib backend (this uses processes on your local computer):

>>> def expensive_calculation(num1, num2):
...     return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> parallel = Parallel(n_jobs=5)
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]

You can do the same parallel computation using the Civis backend by creating and registering a backend factory and entering a with parallel_config('civis') context. The code below will start seven different jobs in Civis Platform (with up to five running at once). Each job will call the function expensive_calculation with a different set of arguments from the list args:

>>> def expensive_calculation(num1, num2):
...     return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
...     required_resources={"cpu": 512, "memory": 256}))
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> with parallel_config('civis'):
...     parallel = Parallel(n_jobs=5, pre_dispatch='n_jobs')
...     print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]

Since joblib v1.4.2, joblib.Parallel has a return_as parameter that can accept 'generator' or 'generator_unordered' (default is 'list', whose behavior is shown in the examples above). Returning a generator, especially the “unordered” version, instead of a list is useful for getting (partial) results back from Civis Platform faster as soon as any child job finishes (as opposed to having to waiting for all child jobs to finish before you get a resulting list). With return_as='generator_unordered', you might want to keep track of the ordering of the child jobs’ results using enumerate():

>>> import inspect
>>> import time
>>> def expensive_calculation(order, num):
...     # lower order for a longer sleep to simulate a longer job
...     time.sleep(10 ** (2 - order))
...     return order, num
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
...     required_resources={"cpu": 512, "memory": 256}))
>>> args = enumerate(['foo', 'bar', 'baz'])  # `enumerate` to get an order index
>>> with parallel_config('civis'):
...     parallel = Parallel(
...         n_jobs=5, pre_dispatch='n_jobs',
...         return_as='generator_unordered',  # yields a result from a child job as soon as it's ready
...     )
...     results = parallel(delayed(expensive_calculation)(*a) for a in args)
...     assert inspect.isgenerator(results)
...     for order, num in results:
...         print(order, num)
2 baz
1 bar
0 foo

Lastly, you can use the Civis joblib backend to parallelize any code which uses joblib internally, such as scikit-learn:

>>> from joblib import parallel_config, register_parallel_backend
>>> from sklearn.model_selection import GridSearchCV
>>> from sklearn.ensemble import GradientBoostingClassifier
>>> from sklearn.datasets import load_digits
>>> digits = load_digits()
>>> param_grid = {
...     "max_depth": [1, 3, 5, None],
...     "max_features": ["sqrt", "log2", None],
...     "learning_rate": [0.1, 0.01, 0.001]
... }
>>> # Note: n_jobs and pre_dispatch specify the maximum number of
>>> # concurrent jobs.
>>> gs = GridSearchCV(GradientBoostingClassifier(n_estimators=1000,
...                                              random_state=42),
...                   param_grid=param_grid,
...                   n_jobs=5, pre_dispatch="n_jobs")
>>> register_parallel_backend('civis', make_backend_factory(
...     required_resources={"cpu": 512, "memory": 256}))
>>> with parallel_config('civis'):
...     gs.fit(digits.data, digits.target)

Debugging

Any (non-retried) errors in child jobs will cause the entire parallel call to fail. joblib will transport the first exception from a remote job and raise it in the parent process so that you can debug.

If your remote jobs are failing because of network problems (e.g., occasional 500 errors), you can make your parallel call more likely to succeed by using a max_job_retries value above 0 when creating your backend factory. This will automatically retry a job (potentially more than once) before giving up and keeping an exception.

Logging: The Civis joblib backend uses the standard library logging module, with debug emits for events which might help you diagnose errors. See also the “verbose” argument to joblib.Parallel, which prints information to either stdout or stderr.

Mismatches between your local environment and the environment in the Civis container script jobs are a common source of errors. To run a function in Civis platform, any modules called by that function must be importable from a Python interpreter running in the container script. For example, if you use joblib.Parallel with numpy.sqrt(), the joblib backend must be set to run your function in a container which has numpy installed. If you see an error such as:

ModuleNotFoundError: No module named 'numpy'

this signifies that the module you’re trying to use doesn’t exist in the remote environment. Select a Docker container with the module installed, or install it in your remote environment by using the repo_http_uri parameter of make_backend_factory() to install it from GitHub.

Object Reference

Parallel computations using the Civis Platform infrastructure

exception civis.parallel.JobSubmissionError[source]

An error occurred while submitting a job to Civis Platform.

civis.parallel.infer_backend_factory(required_resources=None, params=None, arguments=None, client=None, polling_interval=None, setup_cmd=None, max_submit_retries=0, max_job_retries=0, hidden=True, remote_backend='sequential', **kwargs)[source]

Infer the container environment and return a backend factory.

This function helps you run additional jobs from code which executes inside a Civis container job. The function reads settings for relevant parameters (e.g., the Docker image) of the container it’s running inside of.

Jobs created through this backend will have environment variables CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID with the contents of the CIVIS_JOB_ID and CIVIS_RUN_ID of the environment which created them. If the code doesn’t have CIVIS_JOB_ID and CIVIS_RUN_ID environment variables available, the child will not have CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID environment variables.

Note

This function will read the state of the parent container job at the time this function executes. If the user has modified the container job since the run started (e.g., by changing the GitHub branch in the container’s GUI), this function may infer incorrect settings for the child jobs.

Keyword arguments inferred from the existing script’s state are [‘docker_image_name’, ‘docker_image_tag’, ‘repo_http_uri’, ‘repo_ref’, ‘remote_host_credential_id’, ‘git_credential_id’, ‘cancel_timeout’, ‘time_zone’]

Parameters:
required_resourcesdict or None, optional

The resources needed by the container. See the container scripts API documentation for details. Resource requirements not specified will default to the requirements of the current job.

paramslist or None, optional

A definition of the parameters this script accepts in the arguments field. See the container scripts API documentation for details.

Parameters of the child jobs will default to the parameters of the current job. Any parameters provided here will override parameters of the same name from the current job.

argumentsdict or None, optional

Dictionary of name/value pairs to use to run this script. Only settable if this script has defined params. See the container scripts API documentation for details.

Arguments will default to the arguments of the current job. Anything provided here will override portions of the current job’s arguments.

clientcivis.APIClient instance or None, optional

An API Client object to use.

polling_intervalint, optional

The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.

setup_cmdstr, optional

A shell command or sequence of commands for setting up the environment. These will precede the commands used to run functions in joblib. This is primarily for installing dependencies that are not available in the Docker image (e.g., cd /app && pip install . or pip install gensim).

With no GitHub repo input, the setup command will default to a command that does nothing. If a repo_http_uri is provided, the default setup command will attempt to run pip install .. If this command fails, execution will still continue.

max_submit_retriesint, optional

The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).

max_job_retriesint, optional

Retry failed jobs this number of times before giving up. Even more than with max_submit_retries, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.

hidden: bool, optional

The hidden status of the object. Setting this to True hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.

remote_backendstr or object, optional

The name of a joblib backend or a joblib backend itself. This parameter is the joblib backend to use when executing code within joblib in the container. The default of 'sequential' uses the joblib sequential backend in the container. The value ‘civis’ uses an exact copy of the Civis joblib backend that launched the container. Note that with the value 'civis', one can potentially use more jobs than specified by n_jobs.

**kwargs:

Additional keyword arguments will be passed directly to post_containers(), potentially overriding the values of those arguments in the parent environment.

Raises:
RuntimeError

If this function is not running inside a Civis container job.

civis.parallel.make_backend_factory(docker_image_name='civisanalytics/datascience-python', client=None, polling_interval=None, setup_cmd=None, max_submit_retries=0, max_job_retries=0, hidden=True, remote_backend='sequential', **kwargs)[source]

Create a joblib backend factory that uses Civis Container Scripts.

Jobs created through this backend will have environment variables CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID with the contents of the CIVIS_JOB_ID and CIVIS_RUN_ID of the environment which created them. If the code doesn’t have CIVIS_JOB_ID and CIVIS_RUN_ID environment variables available, the child will not have CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID environment variables.

Note

The total size of function parameters in Parallel() calls on this backend must be less than 5 GB due to AWS file size limits.

Note

The maximum number of concurrent jobs in Civis Platform is controlled by both the n_jobs and pre_dispatch parameters of joblib.Parallel. Set pre_dispatch="n_jobs" to have a maximum of n_jobs processes running at once. (The default is pre_dispatch="2*n_jobs".)

Parameters:
docker_image_namestr, optional

The image for the container script. You may also wish to specify a docker_image_tag in the keyword arguments.

clientcivis.APIClient instance or None, optional

An API Client object to use.

polling_intervalint, optional

The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.

setup_cmdstr, optional

A shell command or sequence of commands for setting up the environment. These will precede the commands used to run functions in joblib. This is primarily for installing dependencies that are not available in the dockerhub repo (e.g., cd /app && pip install . or pip install gensim).

With no GitHub repo input, the setup command will default to a command that does nothing. If a repo_http_uri is provided, the default setup command will attempt to run pip install .. If this command fails, execution will still continue.

max_submit_retriesint, optional

The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).

max_job_retriesint, optional

Retry failed jobs this number of times before giving up. Even more than with max_submit_retries, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.

hidden: bool, optional

The hidden status of the object. Setting this to true hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.

remote_backendstr or object, optional

The name of a joblib backend or a joblib backend itself. This parameter is the joblib backend to use when executing code within joblib in the container. The default of 'sequential' uses the joblib sequential backend in the container. The value 'civis' uses an exact copy of the Civis joblib backend that launched the container. Note that with the value 'civis', one can potentially use more jobs than specified by n_jobs.

**kwargs:

Additional keyword arguments will be passed directly to civis.APIClient.scripts.post_containers.

Notes

Joblib’s joblib.parallel.register_parallel_backend() (see example above) expects a callable that returns a joblib.parallel.ParallelBackendBase instance. This function allows the user to specify the Civis container script setting that will be used when that backend creates container scripts to run jobs.

The specified Docker image (optionally, with a GitHub repo and setup command) must have basically the same environment as the one in which this module is used to submit jobs. The worker jobs need to be able to deserialize the jobs they are given, including the data and all the necessary Python objects (e.g., if you pass a Pandas data frame, the image must have Pandas installed). You may use functions and classes dynamically defined in the code (e.g., lambda functions), but if your joblib-parallelized function calls code imported from another module, that module must be installed in the remote environment.

Examples

>>> # Without joblib:
>>> from math import sqrt
>>> print([sqrt(i ** 2) for i in range(10)])
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using the default joblib backend:
>>> from joblib import delayed, Parallel
>>> parallel = Parallel(n_jobs=5)
>>> print(parallel(delayed(sqrt)(i ** 2) for i in range(10)))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using the Civis backend:
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
...     required_resources={"cpu": 512, "memory": 256}))
>>> with parallel_config('civis'):
...    parallel = Parallel(n_jobs=5, pre_dispatch='n_jobs')
...    print(parallel(delayed(sqrt)(i ** 2) for i in range(10)))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using scikit-learn with the Civis backend:
>>> from sklearn.model_selection import GridSearchCV
>>> from sklearn.ensemble import GradientBoostingClassifier
>>> from sklearn.datasets import load_digits
>>> digits = load_digits()
>>> param_grid = {
...     "max_depth": [1, 3, 5, None],
...     "max_features": ["sqrt", "log2", None],
...     "learning_rate": [0.1, 0.01, 0.001]
... }
>>> # Note: n_jobs and pre_dispatch specify the maximum number of
>>> # concurrent jobs.
>>> gs = GridSearchCV(GradientBoostingClassifier(n_estimators=1000,
...                                              random_state=42),
...                   param_grid=param_grid,
...                   n_jobs=5, pre_dispatch="n_jobs")
>>> register_parallel_backend('civis', make_backend_factory(
...     required_resources={"cpu": 512, "memory": 256}))
>>> with parallel_config('civis'):
...     gs.fit(digits.data, digits.target)
civis.parallel.make_backend_template_factory(from_template_id, arguments=None, client=None, polling_interval=None, max_submit_retries=0, max_job_retries=0, hidden=True, **kwargs)[source]

Create a joblib backend factory that uses Civis Custom Scripts.

If your template has settable parameters CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID, then this executor will fill them with the contents of the CIVIS_JOB_ID and CIVIS_RUN_ID of the environment which created them. If the code doesn’t have CIVIS_JOB_ID and CIVIS_RUN_ID environment variables available, the child will not have CIVIS_PARENT_JOB_ID and CIVIS_PARENT_RUN_ID environment variables.

Parameters:
from_template_id: int

Create jobs as Custom Scripts from the given template ID. When using the joblib backend with templates, the template must have a very specific form. Refer to the documentation for details.

argumentsdict or None, optional

Dictionary of name/value pairs to use to run this script. Only settable if this script has defined params. See the container scripts API documentation for details.

clientcivis.APIClient instance or None, optional

An API Client object to use.

polling_intervalint, optional

The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.

max_submit_retriesint, optional

The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).

max_job_retriesint, optional

Retry failed jobs this number of times before giving up. Even more than with max_submit_retries, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.

hidden: bool, optional

The hidden status of the object. Setting this to True hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.

**kwargs:

Additional keyword arguments will be passed directly to civis.APIClient.scripts.post_custom.