Parallel Computation
Civis Platform manages a pool of cloud computing resources.
You can access these resources with the tools in the civis.parallel
and civis.futures
modules.
Joblib backend
If you can divide your work into multiple independent chunks, each of which takes at least several minutes to run, you can reduce the time your job takes to finish by running each chunk simultaneously in Civis Platform. The Civis joblib backend is a software tool which makes it easier to run many jobs simultaneously.
Things to keep in mind when deciding if the Civis joblib backend is the right tool for your code:
Each function call which is parallelized with the Civis joblib backend will run in a different Civis Platform script. Creating a new script comes with some overhead. It will take between a few seconds and a few minutes for each script to start, depending on whether Civis Platform needs to provision additional resources. If you expect that each function call will complete quickly, instead consider either running them in serial or using extra processes in the same Civis Platform script.
Because function calls run in different scripts, function inputs and outputs must be uploaded to Civis Platform from their origin script and downloaded into their destination. If your functions take very large inputs and/or produce very large outputs, moving the data around will cause additional overhead. Consider either using a different tool or refactoring your code so that the function to be parallelized is no longer moving around large amounts of data.
Some open-source libraries, such as
scikit-learn
, usejoblib
to do computations in parallel. If you’re working with such a library, the Civis joblib backend provides an easy way to run these parallel computations in different Civis Platform scripts.
Joblib
joblib is an open source Python library which facilitates parallel processing in Python. While Joblib comes with its own parallel computation tools, it also allows users to define their own “back end”. The Civis Python API client takes advantage of this to let you easily run your own code in parallel through Civis Platform.
The make_backend_factory()
,
infer_backend_factory()
, and
make_backend_template_factory()
functions allow you
to define a “civis” parallel computation backend which will transparently
distribute computation in cloud resources managed by Civis Platform.
See the joblib user guide for examples of using joblib to do parallel computation. Note that the descriptions of “memmapping” aren’t relevant to using Civis Platform as a backend, since your jobs will potentially run on different computers and can’t share memory. Using the Civis joblib backend to run jobs in parallel in the cloud looks the same as running jobs in parallel on your local computer, except that you first need to set up the “civis” backend.
How to use
Begin by defining the backend. The Civis joblib backend creates and runs
Container Scripts, and the make_backend_factory()
function accepts several arguments which will be passed to
post_containers()
.
For example, you could pass a repo_http_uri
or repo_ref
to clone a repository from GitHub into the container which will run
your function. Use the docker_image_name
and docker_image_tag
to select a custom Docker image for your job.
You can provide a setup_cmd
to run setup in bash before your
function executes in Python. The default setup_cmd
will run
pip install .
in the base directory of any repo_http_uri
which you include in your backend setup.
Make sure that the environment you define for your Civis backend
includes all of the code which your parallel function will call.
The make_backend_factory()
function will return a
backend factory which should be given to the joblib.register_parallel_backend()
function. For example:
>>> from joblib import register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> be_factory = make_backend_factory()
>>> register_parallel_backend('civis', be_factory)
Direct joblib
to use a custom backend by entering a joblib.parallel_config()
context:
>>> from joblib import parallel_config
>>> with parallel_config('civis'):
... # Do joblib parallel computation here.
You can find more about custom joblib backends in the joblib documentation.
Note that joblib.Parallel
takes both a n_jobs
and pre_dispatch
parameter. The Civis joblib backend doesn’t queue submitted jobs itself,
so it will run pre_dispatch
jobs at once.
Note
Since joblib v1.3.0, n_jobs=1
will disable the use of the
specified parallel backend, and will simply run the given function
with a regular for
loop. n_jobs=1
would be useful for testing
your code locally before running it in Civis Platform.
Note
The default value of
pre_dispatch
is "2*n_jobs"
, which will run a maximum of 2 * n_jobs
jobs
at once on Civis Platform. Set pre_dispatch="n_jobs"
in your
joblib.Parallel
call to run at most n_jobs
jobs.
The Civis joblib backend uses cloudpickle to transport code and data from the parent environment to Civis Platform. This means that you may parallelize dynamically-defined functions and classes, including lambda functions.
The joblib backend will automatically add environment variables called
CIVIS_PARENT_JOB_ID
and CIVIS_PARENT_RUN_ID
, holding the values
of the job and run IDs of the Civis Platform job in which you’re
running the joblib backend (if any). Your functions could use these
to communicate with the parent job or to recognize that they’re in
a process which has been created by another Civis Platform job.
However, where possible you should let the joblib backend itself
transport the return value of the function it’s running back to the parent.
Infer backend parameters
If you’re writing code which will run inside a Civis Container Script, then
the infer_backend_factory()
function returns a backend
factory with environment parameters pre-populated by inspecting the state of
your container script at run time. Use infer_backend_factory()
anywhere you would use make_backend_factory()
,
and you don’t need to specify a Docker image or GitHub repository.
Script Templates
The make_backend_template_factory()
is intended for
developers who are writing code which may be run by users who don’t have
permissions to create new container scripts with the necessary environment.
Instead of defining and creating new container scripts with
make_backend_factory()
, you can use
make_backend_template_factory()
to launch custom scripts from
a script template. To use the template factory, your backing container script must
have the Civis Python client installed, and its run command must finish
by calling civis_joblib_worker
with no arguments. The template must accept the
parameter JOBLIB_FUNC_FILE_ID
. The Civis joblib backend will use this parameter
to transport your remote work.
Examples
Parallel computation using the default joblib backend (this uses processes on your local computer):
>>> def expensive_calculation(num1, num2):
... return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> parallel = Parallel(n_jobs=5)
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]
You can do the same parallel computation using the Civis backend
by creating and registering a backend factory and entering a
with parallel_config('civis')
context. The code below will start
seven different jobs in Civis Platform (with up to five running at once).
Each job will call the function expensive_calculation
with a
different set of arguments from the list args
:
>>> def expensive_calculation(num1, num2):
... return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> with parallel_config('civis'):
... parallel = Parallel(n_jobs=5, pre_dispatch='n_jobs')
... print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]
Since joblib v1.4.2, joblib.Parallel
has a return_as
parameter
that can accept 'generator'
or 'generator_unordered'
(default is 'list'
, whose behavior is shown in the examples above).
Returning a generator, especially the “unordered” version, instead of a list
is useful for getting (partial) results back from Civis Platform faster
as soon as any child job finishes (as opposed to having to waiting for all child jobs
to finish before you get a resulting list). With return_as='generator_unordered'
,
you might want to keep track of the ordering of the child jobs’ results
using enumerate()
:
>>> import inspect
>>> import time
>>> def expensive_calculation(order, num):
... # lower order for a longer sleep to simulate a longer job
... time.sleep(10 ** (2 - order))
... return order, num
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> args = enumerate(['foo', 'bar', 'baz']) # `enumerate` to get an order index
>>> with parallel_config('civis'):
... parallel = Parallel(
... n_jobs=5, pre_dispatch='n_jobs',
... return_as='generator_unordered', # yields a result from a child job as soon as it's ready
... )
... results = parallel(delayed(expensive_calculation)(*a) for a in args)
... assert inspect.isgenerator(results)
... for order, num in results:
... print(order, num)
2 baz
1 bar
0 foo
Lastly, you can use the Civis joblib backend to parallelize any code which uses joblib internally, such as scikit-learn:
>>> from joblib import parallel_config, register_parallel_backend
>>> from sklearn.model_selection import GridSearchCV
>>> from sklearn.ensemble import GradientBoostingClassifier
>>> from sklearn.datasets import load_digits
>>> digits = load_digits()
>>> param_grid = {
... "max_depth": [1, 3, 5, None],
... "max_features": ["sqrt", "log2", None],
... "learning_rate": [0.1, 0.01, 0.001]
... }
>>> # Note: n_jobs and pre_dispatch specify the maximum number of
>>> # concurrent jobs.
>>> gs = GridSearchCV(GradientBoostingClassifier(n_estimators=1000,
... random_state=42),
... param_grid=param_grid,
... n_jobs=5, pre_dispatch="n_jobs")
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> with parallel_config('civis'):
... gs.fit(digits.data, digits.target)
Debugging
Any (non-retried) errors in child jobs will cause the entire parallel call to
fail. joblib
will transport the first exception from a remote job and
raise it in the parent process so that you can debug.
If your remote jobs are failing because of network problems (e.g., occasional
500 errors), you can make your parallel call more likely to succeed by using
a max_job_retries
value above 0 when creating your backend factory.
This will automatically retry a job (potentially more than once) before giving
up and keeping an exception.
Logging: The Civis joblib backend uses the standard library
logging module,
with debug emits for events which might help you diagnose errors.
See also the “verbose” argument to joblib.Parallel
, which
prints information to either stdout or stderr.
Mismatches between your local environment and the environment in the
Civis container script jobs are a common source of errors.
To run a function in Civis platform, any modules called by
that function must be importable from a Python interpreter running
in the container script. For example, if you use joblib.Parallel
with numpy.sqrt()
, the joblib backend must be set to run
your function in a container which has numpy
installed.
If you see an error such as:
ModuleNotFoundError: No module named 'numpy'
this signifies that the module you’re trying to use doesn’t exist
in the remote environment. Select a Docker container with the module installed,
or install it in your remote environment by using the repo_http_uri
parameter of make_backend_factory()
to install it from GitHub.
Object Reference
Parallel computations using the Civis Platform infrastructure
- exception civis.parallel.JobSubmissionError[source]
An error occurred while submitting a job to Civis Platform.
- civis.parallel.infer_backend_factory(required_resources=None, params=None, arguments=None, client=None, polling_interval=None, setup_cmd=None, max_submit_retries=0, max_job_retries=0, hidden=True, remote_backend='sequential', **kwargs)[source]
Infer the container environment and return a backend factory.
This function helps you run additional jobs from code which executes inside a Civis container job. The function reads settings for relevant parameters (e.g., the Docker image) of the container it’s running inside of.
Jobs created through this backend will have environment variables
CIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
with the contents of theCIVIS_JOB_ID
andCIVIS_RUN_ID
of the environment which created them. If the code doesn’t haveCIVIS_JOB_ID
andCIVIS_RUN_ID
environment variables available, the child will not haveCIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
environment variables.Note
This function will read the state of the parent container job at the time this function executes. If the user has modified the container job since the run started (e.g., by changing the GitHub branch in the container’s GUI), this function may infer incorrect settings for the child jobs.
Keyword arguments inferred from the existing script’s state are [‘docker_image_name’, ‘docker_image_tag’, ‘repo_http_uri’, ‘repo_ref’, ‘remote_host_credential_id’, ‘git_credential_id’, ‘cancel_timeout’, ‘time_zone’]
- Parameters:
- required_resourcesdict or None, optional
The resources needed by the container. See the container scripts API documentation for details. Resource requirements not specified will default to the requirements of the current job.
- paramslist or None, optional
A definition of the parameters this script accepts in the arguments field. See the container scripts API documentation for details.
Parameters of the child jobs will default to the parameters of the current job. Any parameters provided here will override parameters of the same name from the current job.
- argumentsdict or None, optional
Dictionary of name/value pairs to use to run this script. Only settable if this script has defined params. See the container scripts API documentation for details.
Arguments will default to the arguments of the current job. Anything provided here will override portions of the current job’s arguments.
- client
civis.APIClient
instance or None, optional An API Client object to use.
- polling_intervalint, optional
The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.
- setup_cmdstr, optional
A shell command or sequence of commands for setting up the environment. These will precede the commands used to run functions in joblib. This is primarily for installing dependencies that are not available in the Docker image (e.g.,
cd /app && pip install .
orpip install gensim
).With no GitHub repo input, the setup command will default to a command that does nothing. If a
repo_http_uri
is provided, the default setup command will attempt to runpip install .
. If this command fails, execution will still continue.- max_submit_retriesint, optional
The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).
- max_job_retriesint, optional
Retry failed jobs this number of times before giving up. Even more than with
max_submit_retries
, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.- hidden: bool, optional
The hidden status of the object. Setting this to True hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.
- remote_backendstr or object, optional
The name of a joblib backend or a joblib backend itself. This parameter is the joblib backend to use when executing code within joblib in the container. The default of
'sequential'
uses the joblib sequential backend in the container. The value ‘civis’ uses an exact copy of the Civis joblib backend that launched the container. Note that with the value'civis'
, one can potentially use more jobs than specified byn_jobs
.- **kwargs:
Additional keyword arguments will be passed directly to
post_containers()
, potentially overriding the values of those arguments in the parent environment.
- Raises:
- RuntimeError
If this function is not running inside a Civis container job.
See also
- civis.parallel.make_backend_factory(docker_image_name='civisanalytics/datascience-python', client=None, polling_interval=None, setup_cmd=None, max_submit_retries=0, max_job_retries=0, hidden=True, remote_backend='sequential', **kwargs)[source]
Create a joblib backend factory that uses Civis Container Scripts.
Jobs created through this backend will have environment variables
CIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
with the contents of theCIVIS_JOB_ID
andCIVIS_RUN_ID
of the environment which created them. If the code doesn’t haveCIVIS_JOB_ID
andCIVIS_RUN_ID
environment variables available, the child will not haveCIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
environment variables.Note
The total size of function parameters in
Parallel()
calls on this backend must be less than 5 GB due to AWS file size limits.Note
The maximum number of concurrent jobs in Civis Platform is controlled by both the
n_jobs
andpre_dispatch
parameters ofjoblib.Parallel
. Setpre_dispatch="n_jobs"
to have a maximum ofn_jobs
processes running at once. (The default ispre_dispatch="2*n_jobs"
.)- Parameters:
- docker_image_namestr, optional
The image for the container script. You may also wish to specify a
docker_image_tag
in the keyword arguments.- client
civis.APIClient
instance or None, optional An API Client object to use.
- polling_intervalint, optional
The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.
- setup_cmdstr, optional
A shell command or sequence of commands for setting up the environment. These will precede the commands used to run functions in joblib. This is primarily for installing dependencies that are not available in the dockerhub repo (e.g.,
cd /app && pip install .
orpip install gensim
).With no GitHub repo input, the setup command will default to a command that does nothing. If a
repo_http_uri
is provided, the default setup command will attempt to runpip install .
. If this command fails, execution will still continue.- max_submit_retriesint, optional
The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).
- max_job_retriesint, optional
Retry failed jobs this number of times before giving up. Even more than with
max_submit_retries
, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.- hidden: bool, optional
The hidden status of the object. Setting this to true hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.
- remote_backendstr or object, optional
The name of a joblib backend or a joblib backend itself. This parameter is the joblib backend to use when executing code within joblib in the container. The default of
'sequential'
uses the joblib sequential backend in the container. The value'civis'
uses an exact copy of the Civis joblib backend that launched the container. Note that with the value'civis'
, one can potentially use more jobs than specified byn_jobs
.- **kwargs:
Additional keyword arguments will be passed directly to
civis.APIClient.scripts.post_containers
.
Notes
Joblib’s
joblib.parallel.register_parallel_backend()
(see example above) expects a callable that returns ajoblib.parallel.ParallelBackendBase
instance. This function allows the user to specify the Civis container script setting that will be used when that backend creates container scripts to run jobs.The specified Docker image (optionally, with a GitHub repo and setup command) must have basically the same environment as the one in which this module is used to submit jobs. The worker jobs need to be able to deserialize the jobs they are given, including the data and all the necessary Python objects (e.g., if you pass a Pandas data frame, the image must have Pandas installed). You may use functions and classes dynamically defined in the code (e.g., lambda functions), but if your joblib-parallelized function calls code imported from another module, that module must be installed in the remote environment.
Examples
>>> # Without joblib: >>> from math import sqrt >>> print([sqrt(i ** 2) for i in range(10)]) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using the default joblib backend: >>> from joblib import delayed, Parallel >>> parallel = Parallel(n_jobs=5) >>> print(parallel(delayed(sqrt)(i ** 2) for i in range(10))) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using the Civis backend: >>> from joblib import parallel_config, register_parallel_backend >>> from civis.parallel import make_backend_factory >>> register_parallel_backend('civis', make_backend_factory( ... required_resources={"cpu": 512, "memory": 256})) >>> with parallel_config('civis'): ... parallel = Parallel(n_jobs=5, pre_dispatch='n_jobs') ... print(parallel(delayed(sqrt)(i ** 2) for i in range(10))) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> # Using scikit-learn with the Civis backend: >>> from sklearn.model_selection import GridSearchCV >>> from sklearn.ensemble import GradientBoostingClassifier >>> from sklearn.datasets import load_digits >>> digits = load_digits() >>> param_grid = { ... "max_depth": [1, 3, 5, None], ... "max_features": ["sqrt", "log2", None], ... "learning_rate": [0.1, 0.01, 0.001] ... } >>> # Note: n_jobs and pre_dispatch specify the maximum number of >>> # concurrent jobs. >>> gs = GridSearchCV(GradientBoostingClassifier(n_estimators=1000, ... random_state=42), ... param_grid=param_grid, ... n_jobs=5, pre_dispatch="n_jobs") >>> register_parallel_backend('civis', make_backend_factory( ... required_resources={"cpu": 512, "memory": 256})) >>> with parallel_config('civis'): ... gs.fit(digits.data, digits.target)
- civis.parallel.make_backend_template_factory(from_template_id, arguments=None, client=None, polling_interval=None, max_submit_retries=0, max_job_retries=0, hidden=True, **kwargs)[source]
Create a joblib backend factory that uses Civis Custom Scripts.
If your template has settable parameters
CIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
, then this executor will fill them with the contents of theCIVIS_JOB_ID
andCIVIS_RUN_ID
of the environment which created them. If the code doesn’t haveCIVIS_JOB_ID
andCIVIS_RUN_ID
environment variables available, the child will not haveCIVIS_PARENT_JOB_ID
andCIVIS_PARENT_RUN_ID
environment variables.- Parameters:
- from_template_id: int
Create jobs as Custom Scripts from the given template ID. When using the joblib backend with templates, the template must have a very specific form. Refer to the documentation for details.
- argumentsdict or None, optional
Dictionary of name/value pairs to use to run this script. Only settable if this script has defined params. See the container scripts API documentation for details.
- client
civis.APIClient
instance or None, optional An API Client object to use.
- polling_intervalint, optional
The polling interval, in seconds, for checking container script status. If you have many jobs, you may want to set this higher (e.g., 300) to avoid rate-limiting.
- max_submit_retriesint, optional
The maximum number of retries for submitting each job. This is to help avoid a large set of jobs failing because of a single 5xx error. A value higher than zero should only be used for jobs that are idempotent (i.e., jobs whose result and side effects are the same regardless of whether they are run once or many times).
- max_job_retriesint, optional
Retry failed jobs this number of times before giving up. Even more than with
max_submit_retries
, this should only be used for jobs which are idempotent, as the job may have caused side effects (if any) before failing. These retries assist with jobs which may have failed because of network or worker failures.- hidden: bool, optional
The hidden status of the object. Setting this to True hides it from most API endpoints. The object can still be queried directly by ID. Defaults to True.
- **kwargs:
Additional keyword arguments will be passed directly to
civis.APIClient.scripts.post_custom
.