Setting up, configuring and deploying Dask Distributed
While there are several setup options listed in the dask.distributed
documentation,
this post aims to cover setup and configuration for applications and monorepos.
In this context multi-tenancy is not required and the business logic, tasks
and configuration will live in the same repository.
The aforementioned approach simplifies deployment and allows for producing a
single Docker image that can be used for the Worker
, as well as the Scheduler
.
An example project would be:
- versioned with a VCS such as Git
- tested and built with continuous integration service (CircleCI, Jenkins, Github Actions)
- running on Kubernetes (with GitOps, i.e. Flux) or AWS Fargate
Configuration #
Using the YAML configuration stored in the repository allows for versioning the configuration changes and by proxy releasing new versions (via CI & Kubernetes + Flux) when an option is changed.
The options listed in the documentation however do not support such use-case, so the consumers need to write a wrapper function that loads the YAML file and overrides the defaults. In addition, it’s sometimes preferable to have the option to override certain fields by environment variables (i.e. different options for production and staging clusters).
It’s also important to note that the configuration loading must happen before the Worker or the Scheduler is started (it’s not sufficient to use the preloading mechanism provided by Dask). An example is provided below.
Show example
1def load_configuration():
2 import os
3 import yaml
4
5 from dask.config import collect_env, config, update
6
7 # Load config overrides from file
8 config_path = os.path.join(os.path.dirname(__file__), 'dask_configuration.yaml')
9 with open(config_path) as f:
10 overrides = yaml.safe_load(f)
11 update(config, overrides)
12
13 # Override from environment variables
14 collect_env()
In order to use the configuration this way, don’t forget to wrap dask-scheduler
and dask-worker
commands.
Wrapper for dask-scheduler
"""
Wrapper around `dask-scheduler` executable,
in case we need to change defaults and such
"""
from my_project.dask_config import load_configuration
load_configuration()
def run_dask_scheduler():
from distributed.cli.dask_scheduler import go
go()
if __name__ == '__main__':
run_dask_scheduler()
Wrapper for dask-worker
"""
Wrapper around `dask-worker` executable,
in case we need to change defaults and such
"""
from my_project.dask_config import load_configuration
load_configuration()
def run_dask_worker():
from distributed.cli.dask_worker import go
go()
if __name__ == '__main__':
run_dask_worker()
Tips & tricks #
While Dask provides sensible defaults out of the box, it’s recommended to extend and modify the configuration based on the application’s performance profile. A few categories are discussed below along with an example that serves as a good starting point for any project.
- Communication: oftentimes intermittent network issues occur, in such cases try increasing the timeout intervals and the number of retries under
distributed.comm
key. - Add or remove HTTP routes: extending the Web API and creating custom HTTP routes is covered in it’s own section.
- Lock lease timeouts: locks and especially semaphores are useful to control resource usage and concurrency, however the documentation warns that the implementation is still experimental and lock lease timeouts need to be closely monitored and adjusted in order to prevent overbooking.
- Worker memory thresholds: this is mostly subjective, although the defaults are a bit conservative, try to experiment with these and see what works the best for your performance profile by monitoring the number of restarts and warnings in the logs.
- Logs: decreasing the number of entries is useful in order to lower the memory usage of the workers if needed.
Show an example dask_configuration.yaml
distributed:
client:
heartbeat: "10s"
scheduler-info-interval: "5s"
comm:
timeouts:
connect: "30s"
tcp: "45s"
retry:
count: 3
delay:
min: "2s"
max: "30s"
socket-backlog: 4096
deploy:
lost-worker-timeout: "35s"
scheduler:
events-cleanup-delay: "1800s"
locks:
lease-validation-interval: "3s"
lease-timeout: "1260s"
http:
routes: [
'distributed.http.scheduler.prometheus',
'distributed.http.scheduler.info',
'distributed.http.scheduler.json',
'distributed.http.health',
'distributed.http.statics',
'my_project.dask_routes.scheduler_routes',
]
preload: [
'my_project.dask_preload',
]
worker:
connections:
outgoing: 50
incoming: 25
memory:
target: 0.75
spill: 0.85
pause: 0.90
terminate: 0.95
http:
routes: [
'distributed.http.worker.prometheus',
'distributed.http.health',
'distributed.http.statics',
'my_project.dask_routes.worker_routes',
]
preload: [
'my_project.dask_preload',
]
admin:
log-length: 3000
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
Error monitoring #
Dask executes tasks on remote workers in the distributed settings. Therefore, when an exception
occurs during the computation, the worker serializes it and sends it back to the Client
which has requested the result.
In order to properly monitor the errors that might arise, it’s desirable to offload this burden from the clients to the individual workers.
Worker plugins provide a convenient interface to observe the lifecycle of each task being executed by the worker.
After extending WorkerPlugin
, the transition
method can be used to listen for the error
state and extract the exception.
Show sample implementation
import logging
from types import TracebackType
from typing import Optional, cast
from dask.utils import funcname
from distributed import Worker
from distributed.diagnostics.plugin import WorkerPlugin
class WorkerMonitor(WorkerPlugin):
def __init__(self):
super().__init__()
self.logger = logging.getLogger('dask.worker.worker_monitor')
def setup(self, worker: Worker):
self.worker = worker # noqa
def transition(self, key: str, start: str, finish: str, *args, **kwargs):
if finish == 'error' and key in self.worker.exceptions and key in self.worker.tasks:
serialized_exc = self.worker.exceptions[key]
if hasattr(serialized_exc, 'data') and isinstance(serialized_exc.data, Exception):
tb = self.worker.tracebacks.get(key)
exc = (
serialized_exc.data.with_traceback(tb.data)
if (tb and hasattr(tb, 'data') and isinstance(tb.data, TracebackType))
else serialized_exc.data
)
exc = cast(Exception, exc)
function, fn_args, fn_kwargs = self.worker.tasks[key]
function_name = str(funcname(function))
extra_data = {
"task_id": key,
"task_name": function_name,
"args": fn_args,
"kwargs": fn_kwargs,
'dask_worker_name': self.worker.name,
}
# TODO: your exception reporting and tracing
my_exception_reporter.report(exc, extra_data)
The sample implementation also extracts the task’s name and couple of useful tags that can be sent to the monitoring service such as Datadog or Prometheus. Then the exception can be reported to the error monitoring service of your choosing (Sentry, Bugsnag, etc).
Preloading #
To ensure that the plugins are initiated every time a new worker is spun up or restarted, the preloading mechanism can be leveraged. Under the hood Dask uses click
library to parse command line arguments and options making it easy for consumers to extend it’s functionality.
Using the error monitoring example from above, we can create a new Python module and define a single method called dask_setup
which takes either Scheduler
or Worker
instance as it’s only argument.
Show the example implementation of the module
from typing import Union
import click
from distributed import Client, Scheduler, Worker
@click.command()
def dask_setup(service: Union[Scheduler, Worker]):
"""
References:
* Lifecycle and CLI integration: https://docs.dask.org/en/latest/setup/custom-startup.html
* Developing plugins: https://distributed.dask.org/en/latest/plugins.html
"""
# Scheduler specific
if isinstance(service, Scheduler):
# Graceful Scheduler shutdown
# (remove once https://github.com/dask/distributed/pull/3332 gets merged & released)
async def on_signal(_signum):
await service.close()
# use the reference implementation from `distributed`
install_signal_handlers(service.loop, cleanup=on_signal)
# Worker specific
elif isinstance(service, Worker):
# Plugin for monitoring & metrics
worker_monitor_plugin = WorkerMonitor()
with Client(
address=service.scheduler.address,
timeout=30,
name='worker-plugin-setup-client',
) as client:
client.register_worker_plugin(worker_monitor_plugin, name='worker-monitor')
distributed.scheduler.preload
and distributed.worker.preload
arrays in your dask_configuration.yaml
to run the script automatically on start up.In order for Dask to load the aforementioned module, it needs to be registered in the configuration under the distributed.scheduler.preload
and distributed.worker.preload
keys.
Alternatively, the --preload
command line argument can be used as shown in the examples below.
Launching the scheduler via CLI
--port 18786 --host 0.0.0.0 \
--dashboard \
--dashboard-address 0.0.0.0:18787 \
--dashboard-prefix my_project-dask \
--preload my_project.dask_preload
# notice the last arguments
Launching the worker via CLI
my_project-dask-scheduler-app:18786 \
--nprocs 1 --nthreads 10 --memory-limit 1300MB \
--worker-port 16000:18700 \
--nanny-port 14000:15999 \
--dashboard-address 18789 \
--death-timeout 120 \
--preload my_project.dask_preload
Extending the Web API #
Custom functionality can also be introduced into workers’ and scheduler’s API by extending RequestHandler
class. Out of the box Dask provides several endpoints, for example an endpoint to check the component’s health, usage statistics, dashboards and more. The full list can be found under distributed.scheduler.http.routes
and distributed.worker.http.routes
configuration keys.
In case you are running Dask within Kubernetes, the following endpoints might be useful.
Proxying workers’ dashboards #
Endpoint that allows proxying Workers’ HTTP server in the Scheduler’s UI. It might be useful to remotely shutdown a specific worker or access worker’s metrics, logs and dashboards for quick debugging.
Show worker_routes.py
distributed.worker.http.routes
array in your dask_configuration.yaml
to load the routes automatically.from functools import partial
from typing import Any, Dict, List, Optional
from distributed import Scheduler
from distributed.http.proxy import GlobalProxyHandler
from distributed.scheduler import WorkerState
from tornado.web import RequestHandler
class WorkerOnlyProxyHandler(GlobalProxyHandler):
"""
Endpoint that allows proxying Workers from the Scheduler.
Useful to remotely shutdown a specific worker or access worker's metrics, logs and dashboards.
"""
def initialize(self, dask_server: Optional[Scheduler] = None, extra: Optional[Dict[Any, Any]] = None):
super().initialize(dask_server=dask_server, extra=extra)
worker_hosts: List[str] = (
[
worker_state.host for worker_state in dask_server.workers.values() # type: WorkerState
]
if dask_server
else []
)
# override the whitelist function to proxy only to workers
self.host_whitelist = partial(self.whitelist_workers, worker_hosts=worker_hosts)
@staticmethod
def whitelist_workers(_handler: RequestHandler, host: str, *, worker_hosts: List[str]):
return any(host in worker_host for worker_host in worker_hosts)
# Export the HTTP routes
#
# https://docs.dask.org/en/latest/configuration-reference.html#distributed.scheduler.http.routes
# https://distributed.dask.org/en/latest/http_services.html
routes = [(r"proxy/(\d+)/(.*?)/(.*)", WorkerOnlyProxyHandler, {})]
Gracefully shutting down the scheduler #
Endpoint to trigger graceful shutdown of the scheduler via Kubernetes’s Lifecycle Hooks. It’s almost mandatory as there’s an outstanding issue with stopping the scheduler gracefully by normal means (see dask/distributed#3332
).
Show scheduler_routes.py
distributed.scheduler.http.routes
array in your dask_configuration.yaml
to load the routes automatically.import logging
from distributed import Worker
from distributed.http.utils import RequestHandler
logger = logging.getLogger("distributed.request_handler")
class TerminationHandler(RequestHandler):
"""
Custom HTTP handler to trigger a graceful shutdown via Kubernetes's Lifecycle Hooks
reference: https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/
"""
def get(self):
self.server: Worker # add typing information (Worker extends distributed.core.Server)
logger.info(f"Lifecycle hook triggered. Initiating graceful shutdown of {self.server.name}.")
self.server.io_loop.add_callback(self.server.close_gracefully)
self.write({'message': 'Shutting down...', 'extra': {'worker_name': self.server.name}})
self.set_header("Content-Type", "application/json")
# Export the HTTP routes
#
# https://docs.dask.org/en/latest/configuration-reference.html#distributed.worker.http.routes
# https://distributed.dask.org/en/latest/http_services.html
routes = [('graceful-shutdown', TerminationHandler, {})]
Labels and resources #
When using the rolling update release strategy, some workers will still be running an “older” release for a period of time until they get incrementally updated. In such cases, it might not be desirable to submit tasks to the older version of the application.
Worker resources provide a convenient abstraction that allows for labeling different releases or different workers based on allocated resources (GPU support, large amout of RAM). This can be achieved manually or by baking the label (version) as an environment variable in the Docker image via build arguments, CI pipeline and Git tags.
Issues to watch
dask/distributed#2156
high CPU usage when the worker is idle