Skip to content

Reducing memory usage of Dask Scheduler

posted in Dask

Alternative text Oftentime users of dask.distributed run a single scheduler with multiple workers, even for production workloads. However in some cases running multiple "clusters" can be beneficial, especially to partially elimate single point of failure, or to constrain resources in a predictable manner (e.g. by sharding workloads). Furthermore, some workloads might require strict isolation, which is usually solved by running a "single node" cluster in Kubernetes Job/Pod.

At certain scale one might first start by optimizing resources given to the Dask workers, which naturally yields a higher return on investment as there are N workers compared to a single scheduler. Afterwards, the scheduler is in the spotlight – let's look at how changing a few configuration options can lead to dramatically reducing the memory footprint.

As a starting point the recommended configuration is extended by adding the entries to the following sections.

Log retention

Dask takes a somewhat different logging approach than one might be used to in the modern Python development ecosystem. Instead of leveraging stdout and structured logging, a lot events are stored in a deque as a variably sized tuple.

Python's deque (short for double-ended queue) supports thread-safe appends and "pops" from both sides of the deque and support setting a maximum capacity. Dask usually sets a pretty generous capacity out of the box, resulting in slow accumulation of memory used.

Therefore significant memory savings can be achieved by reducing the maximum capacity of the internal deques, either by tweaking the configuration options or overriding some properties of the scheduler during the preloading phase.

Note that the maintainers of the Dask project are well aware of the non-standard approach and its shortcomings. Recommended issues to watch are:

Until these are resolved, it is recommended to change the following configuration keys.

Task state transition log

Transition log creates a new entry every time a task transitions to a new state. Its size can be controlled via distributed.scheduler.transition-log-length key, which defaults to 100000.

A single transition log entry consists of tuple of five elements:

  1. Task key (string, at least 40 bytes + content)
  2. Start state (string, most likely interned)
  3. Finish state (string, most likely interned)
  4. Recommendations (dictionary, at least 240 bytes when empty)
  5. Time (float, 8 bytes for reference count, 8 bytes for pointer to the type object, 8 bytes for double = 24 bytes)

An empty tuple's size is 24 bytes and an additional 8 bytes are need per element pointer, therefore a single log entry takes, at minimum, up 24+5*8+304=368 bytes, which sums to ~36.8 MB in total for the whole deque of default size (at minimum).

💡 Recommendation

Reduce distributed.scheduler.transition-log-length to 100 or 200 (~0.368 to 0.736 MB at minimum)

Events log

The event log stores all events as a mapping of event's name and a buffer, which is sized up to distributed.scheduler.events-log-length entries (defaults to 100000).

Meaning that the memory grows even if the scheduler is idle and some workers are connected as the workers produce a heartbeat event every so often. Furthermore a lot of other event types are happening throughout the life of a distributed system, so to illustrate the overhead let's look at a single element of the mapping (defaultdict[deque]), that is a single deque and its elements:

  1. Time (float, 24 bytes)
  2. Message (dictionary with action key and optionally some structured content, at least 240 bytes)

Again, the tuple takes 24+2*8 + 24+240 = 304 bytes, resulting in 30.4 MB usage for a single event type if all 100000 entries are filled.

💡 Recommendation

Reduce distributed.scheduler.events-log-length to 100 or 200 (~0.304 to 0.608 MB at minimum per event type)

Other

The aforementioned changes yielded the highest memory savings, however there are plenty other options that can be adjusted as well.

Below is an excerpt that can be merged with an existing configuration (see configuration reference for more details).

distributed:
  # Diagnostics
  diagnostics:
    computations:
      max-history: 20
  # Scheduler (dashboard)
  schduler:
    dashboard:
      status:
        task-stream-length: 100
      tasks:
        task-stream-length: 500
  # Administrative
  admin:
    max-error-length: 250
    log-length: 250