********************
Parallel Computation
********************
Civis Platform manages a pool of cloud computing resources.
You can access these resources with the tools in the :mod:`civis.parallel`
and :mod:`civis.futures` modules.
Joblib backend
==============
If you can divide your work into multiple independent chunks, each of which takes
at least several minutes to run, you can reduce the time your job takes to finish
by running each chunk simultaneously in Civis Platform. The Civis joblib
backend is a software tool which makes it easier to run many jobs simultaneously.
Things to keep in mind when deciding if the Civis joblib backend is the right
tool for your code:
- Each function call which is parallelized with the Civis joblib backend will run
in a different Civis Platform script. Creating a new script comes with some overhead.
It will take between a few seconds and a few minutes for each script to start,
depending on whether Civis Platform needs to provision additional resources.
If you expect that each function call will complete quickly, instead consider either
running them in serial or using extra processes in the same Civis Platform script.
- Because function calls run in different scripts, function inputs and outputs
must be uploaded to Civis Platform from their origin script and downloaded into
their destination. If your functions take very large inputs and/or produce very
large outputs, moving the data around will cause additional overhead.
Consider either using a different tool or refactoring your code so that
the function to be parallelized is no longer moving around large amounts of data.
- Some open-source libraries, such as ``scikit-learn``, use ``joblib`` to do
computations in parallel. If you're working with such a library, the Civis
joblib backend provides an easy way to run these parallel computations in
different Civis Platform scripts.
Joblib
------
`joblib `_ is an open source
Python library which facilitates parallel processing in Python.
While Joblib comes with its own parallel computation tools,
it also allows users to define their own
"back end". The Civis Python API client takes
advantage of this to let you easily run your own code in parallel
through Civis Platform.
The :func:`~civis.parallel.make_backend_factory`,
:func:`~civis.parallel.infer_backend_factory`, and
:func:`~civis.parallel.make_backend_template_factory` functions allow you
to define a "civis" parallel computation backend which will transparently
distribute computation in cloud resources managed by Civis Platform.
See the `joblib user guide `_
for examples of using joblib to do parallel computation.
Note that the descriptions of "memmapping" aren't relevant to using
Civis Platform as a backend, since your jobs will potentially run
on different computers and can't share memory.
Using the Civis joblib backend to run jobs in parallel in the cloud
looks the same as running jobs in parallel on your local computer,
except that you first need to set up the "civis" backend.
How to use
----------
Begin by defining the backend. The Civis joblib backend creates and runs
Container Scripts, and the :func:`~civis.parallel.make_backend_factory`
function accepts several arguments which will be passed to
:func:`~civis.resources._resources.Scripts.post_containers`.
For example, you could pass a ``repo_http_uri`` or ``repo_ref``
to clone a repository from GitHub into the container which will run
your function. Use the ``docker_image_name`` and ``docker_image_tag``
to select a custom Docker image for your job.
You can provide a ``setup_cmd`` to run setup in bash before your
function executes in Python. The default ``setup_cmd`` will run
``pip install .`` in the base directory of any ``repo_http_uri``
which you include in your backend setup.
Make sure that the environment you define for your Civis backend
includes all of the code which your parallel function will call.
The :func:`~civis.parallel.make_backend_factory` function will return a
backend factory which should be given to the :func:`joblib.register_parallel_backend`
function. For example::
>>> from joblib import register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> be_factory = make_backend_factory()
>>> register_parallel_backend('civis', be_factory)
Direct ``joblib`` to use a custom backend by entering a :func:`joblib.parallel_config`
context::
>>> from joblib import parallel_config
>>> with parallel_config('civis'):
... # Do joblib parallel computation here.
You can find more about custom joblib backends in the
`joblib documentation `_.
Note that :class:`joblib.Parallel` takes both a ``n_jobs`` and ``pre_dispatch``
parameter. The Civis joblib backend doesn't queue submitted jobs itself,
so it will run ``pre_dispatch`` jobs at once.
.. note::
Since joblib v1.3.0, ``n_jobs=1`` will disable the use of the
specified parallel backend, and will simply run the given function
with a regular ``for`` loop. ``n_jobs=1`` would be useful for testing
your code locally before running it in Civis Platform.
.. note::
The default value of
``pre_dispatch`` is ``"2*n_jobs"``, which will run a maximum of ``2 * n_jobs`` jobs
at once on Civis Platform. Set ``pre_dispatch="n_jobs"`` in your
:class:`joblib.Parallel` call to run at most ``n_jobs`` jobs.
The Civis joblib backend uses `cloudpickle `_
to transport code and data from the parent environment to Civis Platform.
This means that you may parallelize dynamically-defined functions and classes,
including lambda functions.
The joblib backend will automatically add environment variables called
``CIVIS_PARENT_JOB_ID`` and ``CIVIS_PARENT_RUN_ID``, holding the values
of the job and run IDs of the Civis Platform job in which you're
running the joblib backend (if any). Your functions could use these
to communicate with the parent job or to recognize that they're in
a process which has been created by another Civis Platform job.
However, where possible you should let the joblib backend itself
transport the return value of the function it's running back to the parent.
Infer backend parameters
------------------------
If you're writing code which will run inside a Civis Container Script, then
the :func:`~civis.parallel.infer_backend_factory` function returns a backend
factory with environment parameters pre-populated by inspecting the state of
your container script at run time. Use :func:`~civis.parallel.infer_backend_factory`
anywhere you would use :func:`~civis.parallel.make_backend_factory`,
and you don't need to specify a Docker image or GitHub repository.
Script Templates
----------------
The :func:`~civis.parallel.make_backend_template_factory` is intended for
developers who are writing code which may be run by users who don't have
permissions to create new container scripts with the necessary environment.
Instead of defining and creating new container scripts with
:func:`~civis.parallel.make_backend_factory`, you can use
:func:`~civis.parallel.make_backend_template_factory` to launch custom scripts from
a script template. To use the template factory, your backing container script must
have the Civis Python client installed, and its run command must finish
by calling ``civis_joblib_worker`` with no arguments. The template must accept the
parameter ``JOBLIB_FUNC_FILE_ID``. The Civis joblib backend will use this parameter
to transport your remote work.
Examples
--------
Parallel computation using the default joblib backend
(this uses processes on your local computer)::
>>> def expensive_calculation(num1, num2):
... return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> parallel = Parallel(n_jobs=5)
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]
You can do the same parallel computation using the Civis backend
by creating and registering a backend factory and entering a
``with parallel_config('civis')`` context. The code below will start
seven different jobs in Civis Platform (with up to five running at once).
Each job will call the function ``expensive_calculation`` with a
different set of arguments from the list ``args``::
>>> def expensive_calculation(num1, num2):
... return 2 * num1 + num2
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> args = [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)]
>>> with parallel_config('civis'):
... parallel = Parallel(n_jobs=5, pre_dispatch='n_jobs')
... print(parallel(delayed(expensive_calculation)(*a) for a in args))
[1, 3, 5, 7, 9, 11, 13]
Since joblib v1.4.2, :class:`joblib.Parallel` has a ``return_as`` parameter
that can accept ``'generator'`` or ``'generator_unordered'``
(default is ``'list'``, whose behavior is shown in the examples above).
Returning a generator, especially the "unordered" version, instead of a list
is useful for getting (partial) results back from Civis Platform faster
as soon as any child job finishes (as opposed to having to waiting for `all` child jobs
to finish before you get a resulting list). With ``return_as='generator_unordered'``,
you might want to keep track of the ordering of the child jobs' results
using :func:`enumerate`:
.. code-block:: python
>>> import inspect
>>> import time
>>> def expensive_calculation(order, num):
... # lower order for a longer sleep to simulate a longer job
... time.sleep(10 ** (2 - order))
... return order, num
>>> from joblib import delayed, Parallel
>>> from joblib import parallel_config, register_parallel_backend
>>> from civis.parallel import make_backend_factory
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> args = enumerate(['foo', 'bar', 'baz']) # `enumerate` to get an order index
>>> with parallel_config('civis'):
... parallel = Parallel(
... n_jobs=5, pre_dispatch='n_jobs',
... return_as='generator_unordered', # yields a result from a child job as soon as it's ready
... )
... results = parallel(delayed(expensive_calculation)(*a) for a in args)
... assert inspect.isgenerator(results)
... for order, num in results:
... print(order, num)
2 baz
1 bar
0 foo
Lastly, you can use the Civis joblib backend to parallelize any code which
uses joblib internally, such as scikit-learn::
>>> from joblib import parallel_config, register_parallel_backend
>>> from sklearn.model_selection import GridSearchCV
>>> from sklearn.ensemble import GradientBoostingClassifier
>>> from sklearn.datasets import load_digits
>>> digits = load_digits()
>>> param_grid = {
... "max_depth": [1, 3, 5, None],
... "max_features": ["sqrt", "log2", None],
... "learning_rate": [0.1, 0.01, 0.001]
... }
>>> # Note: n_jobs and pre_dispatch specify the maximum number of
>>> # concurrent jobs.
>>> gs = GridSearchCV(GradientBoostingClassifier(n_estimators=1000,
... random_state=42),
... param_grid=param_grid,
... n_jobs=5, pre_dispatch="n_jobs")
>>> register_parallel_backend('civis', make_backend_factory(
... required_resources={"cpu": 512, "memory": 256}))
>>> with parallel_config('civis'):
... gs.fit(digits.data, digits.target)
Debugging
---------
Any (non-retried) errors in child jobs will cause the entire parallel call to
fail. ``joblib`` will transport the first exception from a remote job and
raise it in the parent process so that you can debug.
If your remote jobs are failing because of network problems (e.g., occasional
500 errors), you can make your parallel call more likely to succeed by using
a ``max_job_retries`` value above 0 when creating your backend factory.
This will automatically retry a job (potentially more than once) before giving
up and keeping an exception.
Logging: The Civis joblib backend uses the standard library
`logging module `_,
with debug emits for events which might help you diagnose errors.
See also the "verbose" argument to :class:`joblib.Parallel`, which
prints information to either stdout or stderr.
Mismatches between your local environment and the environment in the
Civis container script jobs are a common source of errors.
To run a function in Civis platform, any modules called by
that function must be importable from a Python interpreter running
in the container script. For example, if you use :class:`joblib.Parallel`
with :func:`numpy.sqrt`, the joblib backend must be set to run
your function in a container which has :mod:`numpy` installed.
If you see an error such as::
ModuleNotFoundError: No module named 'numpy'
this signifies that the module you're trying to use doesn't exist
in the remote environment. Select a Docker container with the module installed,
or install it in your remote environment by using the ``repo_http_uri``
parameter of :func:`~civis.parallel.make_backend_factory` to install it from GitHub.
Object Reference
================
.. automodule:: civis.parallel
:members: