import logging
import math
import operator
import os
from collections import OrderedDict, defaultdict
from datetime import datetime
from numbers import Number
import numpy as np
from bokeh.core.properties import without_property_validation
from bokeh.io import curdoc
from bokeh.layouts import column, row
from bokeh.models import (
AdaptiveTicker,
Arrow,
BasicTicker,
BoxSelectTool,
BoxZoomTool,
CDSView,
ColorBar,
ColumnDataSource,
CustomJSHover,
DataRange1d,
GroupFilter,
HoverTool,
NumberFormatter,
NumeralTickFormatter,
OpenURL,
Panel,
PanTool,
Range1d,
ResetTool,
Tabs,
TapTool,
Title,
VeeHead,
WheelZoomTool,
value,
)
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models.widgets.markups import Div
from bokeh.palettes import Viridis11
from bokeh.plotting import figure
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack
from tlz import curry, pipe, valmap
from tlz.curried import concat, groupby, map
from tornado import escape
import dask
from dask import config
from dask.utils import format_bytes, format_time, key_split, parse_timedelta
from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
DashboardComponent,
ProfileServer,
ProfileTimePlot,
SystemMonitor,
)
from distributed.dashboard.utils import BOKEH_VERSION, PROFILING, transpose, update
from distributed.diagnostics.graph_layout import GraphLayout
from distributed.diagnostics.progress import GroupTiming
from distributed.diagnostics.progress_stream import color_of, progress_quads
from distributed.diagnostics.task_stream import TaskStreamPlugin
from distributed.diagnostics.task_stream import color_of as ts_color_of
from distributed.diagnostics.task_stream import colors as ts_color_lookup
from distributed.metrics import time
from distributed.utils import Log, log_errors
if dask.config.get("distributed.dashboard.export-tool"):
from distributed.dashboard.export_tool import ExportTool
else:
ExportTool = None # type: ignore
logger = logging.getLogger(__name__)
from jinja2 import Environment, FileSystemLoader
env = Environment(
loader=FileSystemLoader(
os.path.join(os.path.dirname(__file__), "..", "..", "http", "templates")
)
)
BOKEH_THEME = Theme(
filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml")
)
TICKS_1024 = {"base": 1024, "mantissas": [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]}
XLABEL_ORIENTATION = -math.pi / 9 # slanted downwards 20 degrees
logos_dict = {
"numpy": "statics/images/numpy.png",
"pandas": "statics/images/pandas.png",
"builtins": "statics/images/python.png",
}
class Occupancy(DashboardComponent):
"""Occupancy (in time) per worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"occupancy": [0, 0],
"worker": ["a", "b"],
"x": [0.0, 0.1],
"y": [1, 2],
"ms": [1, 2],
"color": ["red", "blue"],
"escaped_worker": ["a", "b"],
}
)
self.root = figure(
title="Occupancy",
tools="",
toolbar_location="above",
id="bk-occupancy-plot",
x_axis_type="datetime",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source, x="x", width="ms", y="y", height=0.9, color="color"
)
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.yaxis.visible = False
self.root.ygrid.visible = False
# fig.xaxis[0].formatter = NumeralTickFormatter(format='0.0s')
self.root.x_range.start = 0
tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
hover = HoverTool()
hover.tooltips = "@worker : @occupancy s."
hover.point_policy = "follow_mouse"
self.root.add_tools(hover, tap)
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
y = list(range(len(workers)))
occupancy = [ws.occupancy for ws in workers]
ms = [occ * 1000 for occ in occupancy]
x = [occ / 500 for occ in occupancy]
total = sum(occupancy)
color = []
for ws in workers:
if ws in self.scheduler.idle:
color.append("red")
elif ws in self.scheduler.saturated:
color.append("green")
else:
color.append("blue")
if total:
self.root.title.text = (
f"Occupancy -- total time: {format_time(total)} "
f"wall time: {format_time(total / self.scheduler.total_nthreads)}"
)
else:
self.root.title.text = "Occupancy"
if occupancy:
result = {
"occupancy": occupancy,
"worker": [ws.address for ws in workers],
"ms": ms,
"color": color,
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"x": x,
"y": y,
}
update(self.source, result)
class ProcessingHistogram(DashboardComponent):
"""How many tasks are on each worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{"left": [1, 2], "right": [10, 10], "top": [0, 0]}
)
self.root = figure(
title="Tasks Processing (count)",
id="bk-nprocessing-histogram-plot",
name="processing",
y_axis_label="frequency",
tools="",
**kwargs,
)
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.quad(
source=self.source,
left="left",
right="right",
bottom=0,
top="top",
color="deepskyblue",
fill_alpha=0.5,
)
@without_property_validation
def update(self):
L = [len(ws.processing) for ws in self.scheduler.workers.values()]
counts, x = np.histogram(L, bins=40)
self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts})
def _memory_color(current: int, limit: int) -> str:
"""Dynamic color used by WorkersMemory and ClusterMemory"""
if limit and current > limit:
return "red"
if limit and current > limit / 2:
return "orange"
return "blue"
class ClusterMemory(DashboardComponent):
"""Total memory usage on the cluster"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"width": [0] * 4,
"x": [0] * 4,
"y": [0] * 4,
"color": ["blue", "blue", "blue", "grey"],
"alpha": [1, 0.7, 0.4, 1],
"proc_memory": [0] * 4,
"managed": [0] * 4,
"unmanaged_old": [0] * 4,
"unmanaged_recent": [0] * 4,
"spilled": [0] * 4,
}
)
self.root = figure(
title="Bytes stored on cluster",
tools="",
id="bk-cluster-memory-plot",
width=int(width / 2),
name="cluster_memory",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source,
x="x",
y="y",
width="width",
height=0.9,
color="color",
alpha="alpha",
)
rect.nonselection_glyph = None
self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.yaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Process memory (RSS):
@proc_memory{0.00 b}
Managed:
@managed{0.00 b}
Unmanaged (old):
@unmanaged_old{0.00 b}
Unmanaged (recent):
@unmanaged_recent{0.00 b}
Spilled to disk:
@spilled{0.00 b}
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
limit = sum(ws.memory_limit for ws in self.scheduler.workers.values())
meminfo = self.scheduler.memory
color = _memory_color(meminfo.process, limit)
width = [
meminfo.managed_in_memory,
meminfo.unmanaged_old,
meminfo.unmanaged_recent,
meminfo.managed_spilled,
]
result = {
"width": width,
"x": [sum(width[:i]) + w / 2 for i, w in enumerate(width)],
"color": [color, color, color, "grey"],
"proc_memory": [meminfo.process] * 4,
"managed": [meminfo.managed_in_memory] * 4,
"unmanaged_old": [meminfo.unmanaged_old] * 4,
"unmanaged_recent": [meminfo.unmanaged_recent] * 4,
"spilled": [meminfo.managed_spilled] * 4,
}
x_end = max(limit, meminfo.process + meminfo.managed_spilled)
self.root.x_range.end = x_end
title = f"Bytes stored: {format_bytes(meminfo.process)}"
if meminfo.managed_spilled:
title += f" + {format_bytes(meminfo.managed_spilled)} spilled to disk"
self.root.title.text = title
update(self.source, result)
class WorkersMemory(DashboardComponent):
"""Memory usage for single workers"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"width": [],
"x": [],
"y": [],
"color": [],
"alpha": [],
"worker": [],
"escaped_worker": [],
"proc_memory": [],
"managed": [],
"unmanaged_old": [],
"unmanaged_recent": [],
"spilled": [],
}
)
self.root = figure(
title="Bytes stored per worker",
tools="",
id="bk-workers-memory-plot",
width=int(width / 2),
name="workers_memory",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source,
x="x",
y="y",
width="width",
height=0.9,
color="color",
fill_alpha="alpha",
line_width=0,
)
rect.nonselection_glyph = None
self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
self.root.add_tools(tap)
self.root.toolbar_location = None
self.root.yaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker:
@worker
Process memory (RSS):
@proc_memory{0.00 b}
Managed:
@managed{0.00 b}
Unmanaged (old):
@unmanaged_old{0.00 b}
Unmanaged (recent):
@unmanaged_recent{0.00 b}
Spilled to disk:
@spilled{0.00 b}
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
def quadlist(i) -> list:
out = []
for ii in i:
out += [ii, ii, ii, ii]
return out
with log_errors():
workers = self.scheduler.workers.values()
width = []
x = []
color = []
max_limit = 0
procmemory = []
managed = []
spilled = []
unmanaged_old = []
unmanaged_recent = []
for ws in workers:
meminfo = ws.memory
limit = getattr(ws, "memory_limit", 0)
max_limit = max(
max_limit, limit, meminfo.process + meminfo.managed_spilled
)
color_i = _memory_color(meminfo.process, limit)
width += [
meminfo.managed_in_memory,
meminfo.unmanaged_old,
meminfo.unmanaged_recent,
meminfo.managed_spilled,
]
x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)]
color += [color_i, color_i, color_i, "grey"]
# memory info
procmemory.append(meminfo.process)
managed.append(meminfo.managed_in_memory)
unmanaged_old.append(meminfo.unmanaged_old)
unmanaged_recent.append(meminfo.unmanaged_recent)
spilled.append(meminfo.managed_spilled)
result = {
"width": width,
"x": x,
"color": color,
"alpha": [1, 0.7, 0.4, 1] * len(workers),
"worker": quadlist(ws.address for ws in workers),
"escaped_worker": quadlist(
escape.url_escape(ws.address) for ws in workers
),
"y": quadlist(range(len(workers))),
"proc_memory": quadlist(procmemory),
"managed": quadlist(managed),
"unmanaged_old": quadlist(unmanaged_old),
"unmanaged_recent": quadlist(unmanaged_recent),
"spilled": quadlist(spilled),
}
# Remove rectangles with width=0
result = {
k: [vi for vi, w in zip(v, width) if w] for k, v in result.items()
}
self.root.x_range.end = max_limit
update(self.source, result)
class WorkersMemoryHistogram(DashboardComponent):
"""Histogram of memory usage, showing how many workers there are in each bucket of
usage. Replaces the per-worker graph when there are >= 50 workers.
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{"left": [1, 2], "right": [10, 10], "top": [0, 0]}
)
self.root = figure(
title="Bytes stored per worker",
name="workers_memory",
id="bk-workers-memory-histogram-plot",
y_axis_label="frequency",
tools="",
**kwargs,
)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.quad(
source=self.source,
left="left",
right="right",
bottom=0,
top="top",
color="deepskyblue",
fill_alpha=0.5,
)
@without_property_validation
def update(self):
nbytes = np.asarray(
[ws.metrics["memory"] for ws in self.scheduler.workers.values()]
)
counts, x = np.histogram(nbytes, bins=40)
d = {"left": x[:-1], "right": x[1:], "top": counts}
update(self.source, d)
class BandwidthTypes(DashboardComponent):
"""Bar chart showing bandwidth per type"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"bandwidth": [1, 2],
"bandwidth-half": [0.5, 1],
"type": ["a", "b"],
"bandwidth_text": ["1", "2"],
}
)
self.root = figure(
title="Bandwidth by Type",
tools="",
id="bk-bandwidth-type-plot",
name="bandwidth_type_histogram",
y_range=["a", "b"],
**kwargs,
)
self.root.xaxis.major_label_orientation = -0.5
rect = self.root.rect(
source=self.source,
x="bandwidth-half",
y="type",
width="bandwidth",
height=0.9,
color="blue",
)
self.root.x_range.start = 0
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = "@type: @bandwidth_text / s"
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
bw = self.scheduler.bandwidth_types
self.root.y_range.factors = list(sorted(bw))
result = {
"bandwidth": list(bw.values()),
"bandwidth-half": [b / 2 for b in bw.values()],
"type": list(bw.keys()),
"bandwidth_text": [format_bytes(x) for x in bw.values()],
}
self.root.title.text = "Bandwidth: " + format_bytes(
self.scheduler.bandwidth
)
update(self.source, result)
class BandwidthWorkers(DashboardComponent):
"""How many tasks are on each worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"bandwidth": [1, 2],
"source": ["a", "b"],
"destination": ["a", "b"],
"bandwidth_text": ["1", "2"],
}
)
values = [hex(x)[2:] for x in range(64, 256)][::-1]
mapper = linear_cmap(
field_name="bandwidth",
palette=["#" + x + x + "FF" for x in values],
low=0,
high=1,
)
self.root = figure(
title="Bandwidth by Worker",
tools="",
id="bk-bandwidth-worker-plot",
name="bandwidth_worker_heatmap",
x_range=["a", "b"],
y_range=["a", "b"],
**kwargs,
)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.rect(
source=self.source,
x="source",
y="destination",
color=mapper,
height=1,
width=1,
)
self.color_map = mapper["transform"]
color_bar = ColorBar(
color_mapper=self.color_map,
label_standoff=12,
border_line_color=None,
location=(0, 0),
)
color_bar.formatter = NumeralTickFormatter(format="0.0 b")
color_bar.ticker = AdaptiveTicker(**TICKS_1024)
self.root.add_layout(color_bar, "right")
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Source: @source
Destination: @destination
Bandwidth: @bandwidth_text / s
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
bw = self.scheduler.bandwidth_workers
if not bw:
return
def name(address):
try:
ws = self.scheduler.workers[address]
except KeyError:
return address
if ws.name is not None:
return str(ws.name)
return address
x, y, value = zip(*((name(a), name(b), c) for (a, b), c in bw.items()))
self.color_map.high = max(value)
factors = list(sorted(set(x + y)))
self.root.x_range.factors = factors
self.root.y_range.factors = factors[::-1]
result = {
"source": x,
"destination": y,
"bandwidth": value,
"bandwidth_text": list(map(format_bytes, value)),
}
self.root.title.text = "Bandwidth: " + format_bytes(
self.scheduler.bandwidth
)
update(self.source, result)
class WorkerNetworkBandwidth(DashboardComponent):
"""Worker network bandwidth chart
Plots horizontal bars with the read_bytes and write_bytes worker state
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"y_read": [],
"y_write": [],
"x_read": [],
"x_write": [],
"x_read_disk": [],
"x_write_disk": [],
}
)
self.bandwidth = figure(
title="Worker Network Bandwidth",
tools="",
id="bk-worker-net-bandwidth",
name="worker_network_bandwidth",
**kwargs,
)
# read_bytes
self.bandwidth.hbar(
y="y_read",
right="x_read",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)
# write_bytes
self.bandwidth.hbar(
y="y_write",
right="x_write",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)
self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024)
self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.bandwidth.xaxis.minor_tick_line_alpha = 0
self.bandwidth.x_range = Range1d(start=0)
self.bandwidth.yaxis.visible = False
self.bandwidth.ygrid.visible = False
self.bandwidth.toolbar_location = None
self.disk = figure(
title="Workers Disk",
tools="",
id="bk-workers-disk",
name="worker_disk",
**kwargs,
)
# read_bytes_disk
self.disk.hbar(
y="y_read",
right="x_read_disk",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)
# write_bytes_disk
self.disk.hbar(
y="y_write",
right="x_write_disk",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)
self.disk.axis[0].ticker = BasicTicker(**TICKS_1024)
self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.disk.xaxis.minor_tick_line_alpha = 0
self.disk.x_range = Range1d(start=0)
self.disk.yaxis.visible = False
self.disk.ygrid.visible = False
self.disk.toolbar_location = None
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
h = 0.1
y_read = [i + 0.75 + i * h for i in range(len(workers))]
y_write = [i + 0.25 + i * h for i in range(len(workers))]
x_read = []
x_write = []
x_read_disk = []
x_write_disk = []
for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics["read_bytes_disk"])
x_write_disk.append(ws.metrics["write_bytes_disk"])
if self.scheduler.workers:
self.bandwidth.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.bandwidth.x_range.end,
)
self.disk.x_range.end = max(
max(x_read_disk),
max(x_write_disk),
100_000_000,
0.95 * self.disk.x_range.end,
)
else:
self.bandwidth.x_range.end = 100_000_000
self.disk.x_range.end = 100_000_000
result = {
"y_read": y_read,
"y_write": y_write,
"x_read": x_read,
"x_write": x_write,
"x_read_disk": x_read_disk,
"x_write_disk": x_write_disk,
}
update(self.source, result)
class SystemTimeseries(DashboardComponent):
"""Timeseries for worker network bandwidth, cpu, memory and disk.
bandwidth: plots the average of read_bytes and write_bytes for the workers
as a function of time.
cpu: plots the average of cpu for the workers as a function of time.
memory: plots the average of memory for the workers as a function of time.
disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
as a function of time.
The metrics plotted come from the aggregation of
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [],
"read_bytes": [],
"write_bytes": [],
"cpu": [],
"memory": [],
"read_bytes_disk": [],
"write_bytes_disk": [],
}
)
update(self.source, self.get_data())
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
tools = "reset, xpan, xwheel_zoom"
self.bandwidth = figure(
title="Workers Network Bandwidth",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-network-bandwidth-ts",
name="worker_network_bandwidth-timeseries",
**kwargs,
)
self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
color="red",
legend_label="read (mean)",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
color="blue",
legend_label="write (mean)",
)
self.bandwidth.legend.location = "top_left"
self.bandwidth.yaxis.axis_label = "bytes / second"
self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.bandwidth.y_range.start = 0
self.bandwidth.yaxis.minor_tick_line_alpha = 0
self.bandwidth.xgrid.visible = False
self.cpu = figure(
title="Workers CPU",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-cpu-ts",
name="worker_cpu-timeseries",
**kwargs,
)
self.cpu.line(
source=self.source,
x="time",
y="cpu",
)
self.cpu.yaxis.axis_label = "Utilization"
self.cpu.y_range.start = 0
self.cpu.yaxis.minor_tick_line_alpha = 0
self.cpu.xgrid.visible = False
self.memory = figure(
title="Workers Memory",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-memory-ts",
name="worker_memory-timeseries",
**kwargs,
)
self.memory.line(
source=self.source,
x="time",
y="memory",
)
self.memory.yaxis.axis_label = "Bytes"
self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.memory.y_range.start = 0
self.memory.yaxis.minor_tick_line_alpha = 0
self.memory.xgrid.visible = False
self.disk = figure(
title="Workers Disk",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-disk-ts",
name="worker_disk-timeseries",
**kwargs,
)
self.disk.line(
source=self.source,
x="time",
y="read_bytes_disk",
color="red",
legend_label="read (mean)",
)
self.disk.line(
source=self.source,
x="time",
y="write_bytes_disk",
color="blue",
legend_label="write (mean)",
)
self.disk.legend.location = "top_left"
self.disk.yaxis.axis_label = "bytes / second"
self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.disk.y_range.start = 0
self.disk.yaxis.minor_tick_line_alpha = 0
self.disk.xgrid.visible = False
def get_data(self):
workers = self.scheduler.workers.values()
read_bytes = 0
write_bytes = 0
cpu = 0
memory = 0
read_bytes_disk = 0
write_bytes_disk = 0
time = 0
for ws in workers:
read_bytes += ws.metrics["read_bytes"]
write_bytes += ws.metrics["write_bytes"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics["read_bytes_disk"]
write_bytes_disk += ws.metrics["write_bytes_disk"]
time += ws.metrics["time"]
result = {
# use `or` to avoid ZeroDivision when no workers
"time": [time / (len(workers) or 1) * 1000],
"read_bytes": [read_bytes / (len(workers) or 1)],
"write_bytes": [write_bytes / (len(workers) or 1)],
"cpu": [cpu / (len(workers) or 1)],
"memory": [memory / (len(workers) or 1)],
"read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
"write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
}
return result
@without_property_validation
def update(self):
with log_errors():
self.source.stream(self.get_data(), 1000)
if self.scheduler.workers:
y_end_cpu = sum(
ws.nthreads or 1 for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
y_end_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
else:
y_end_cpu = 1
y_end_mem = 100_000_000
self.cpu.y_range.end = y_end_cpu * 100
self.memory.y_range.end = y_end_mem
class ComputePerKey(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
compute_data = {
"times": [0.2, 0.1],
"formatted_time": ["0.2 ms", "2.8 us"],
"angles": [3.14, 0.785],
"color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
"names": ["sum", "sum_partial"],
}
self.compute_source = ColumnDataSource(data=compute_data)
fig = figure(
title="Compute Time Per Task",
tools="",
id="bk-Compute-by-key-plot",
name="compute_time_per_key",
x_range=["a", "b"],
**kwargs,
)
rect = fig.vbar(
source=self.compute_source,
x="names",
top="times",
width=0.7,
color="color",
)
fig.y_range.start = 0
fig.yaxis.axis_label = "Time (s)"
fig.yaxis[0].formatter = NumeralTickFormatter(format="0")
fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
fig.xaxis.major_label_orientation = XLABEL_ORIENTATION
rect.nonselection_glyph = None
fig.xaxis.minor_tick_line_alpha = 0
fig.xgrid.visible = False
fig.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
fig.add_tools(hover)
fig.add_layout(
Title(
text="Note: tasks less than 2% of max are not displayed",
text_font_style="italic",
),
"below",
)
self.fig = fig
tab1 = Panel(child=fig, title="Bar Chart")
fig2 = figure(
title="Compute Time Per Task",
tools="",
id="bk-Compute-by-key-pie",
name="compute_time_per_key-pie",
x_range=(-0.5, 1.0),
**kwargs,
)
fig2.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angles", include_zero=True),
end_angle=cumsum("angles"),
line_color="white",
fill_color="color",
legend_field="names",
source=self.compute_source,
)
fig2.axis.axis_label = None
fig2.axis.visible = False
fig2.grid.grid_line_color = None
fig2.add_layout(
Title(
text="Note: tasks less than 2% of max are not displayed",
text_font_style="italic",
),
"below",
)
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
fig2.add_tools(hover)
self.wedge_fig = fig2
tab2 = Panel(child=fig2, title="Pie Chart")
self.root = Tabs(tabs=[tab1, tab2])
@without_property_validation
def update(self):
with log_errors():
compute_times = defaultdict(float)
for key, ts in self.scheduler.task_prefixes.items():
name = key_split(key)
for action, t in ts.all_durations.items():
if action == "compute":
compute_times[name] += t
# order by largest time first
compute_times = sorted(
compute_times.items(), key=lambda x: x[1], reverse=True
)
# keep only time which are 2% of max or greater
if compute_times:
max_time = compute_times[0][1] * 0.02
compute_times = [(n, t) for n, t in compute_times if t > max_time]
compute_colors = list()
compute_names = list()
compute_time = list()
total_time = 0
for name, t in compute_times:
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t
angles = [t / total_time * 2 * math.pi for t in compute_time]
self.fig.x_range.factors = compute_names
compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)
update(self.compute_source, compute_result)
class AggregateAction(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
action_data = {
"times": [0.2, 0.1],
"formatted_time": ["0.2 ms", "2.8 us"],
"color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
"names": ["transfer", "compute"],
}
self.action_source = ColumnDataSource(data=action_data)
self.root = figure(
title="Aggregate Per Action",
tools="",
id="bk-aggregate-per-action-plot",
name="aggregate_per_action",
x_range=["a", "b"],
**kwargs,
)
rect = self.root.vbar(
source=self.action_source,
x="names",
top="times",
width=0.7,
color="color",
)
self.root.y_range.start = 0
self.root.yaxis[0].formatter = NumeralTickFormatter(format="0")
self.root.yaxis.axis_label = "Time (s)"
self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.major_label_text_font_size = "16px"
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.xgrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
agg_times = defaultdict(float)
for key, ts in self.scheduler.task_prefixes.items():
for action, t in ts.all_durations.items():
agg_times[action] += t
# order by largest time first
agg_times = sorted(agg_times.items(), key=lambda x: x[1], reverse=True)
agg_colors = list()
agg_names = list()
agg_time = list()
for action, t in agg_times:
agg_names.append(action)
if action == "compute":
agg_colors.append("purple")
else:
agg_colors.append(ts_color_lookup[action])
agg_time.append(t)
self.root.x_range.factors = agg_names
self.root.title.text = "Aggregate Time Per Action"
action_result = dict(
times=agg_time,
color=agg_colors,
names=agg_names,
formatted_time=[format_time(t) for t in agg_time],
)
update(self.action_source, action_result)
class MemoryByKey(DashboardComponent):
"""Bar chart showing memory use by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"name": ["a", "b"],
"nbytes": [100, 1000],
"count": [1, 2],
"color": ["blue", "blue"],
}
)
self.root = figure(
title="Memory Use",
tools="",
id="bk-memory-by-key-plot",
name="memory_by_key",
x_range=["a", "b"],
**kwargs,
)
rect = self.root.vbar(
source=self.source, x="name", top="nbytes", width=0.9, color="color"
)
self.root.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = "@name: @nbytes_text"
hover.tooltips = """
Name: @name
Bytes: @nbytes_text
Count: @count objects
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
counts = defaultdict(int)
nbytes = defaultdict(int)
for ws in self.scheduler.workers.values():
for ts in ws.has_what:
ks = key_split(ts.key)
counts[ks] += 1
nbytes[ks] += ts.nbytes
names = list(sorted(counts))
self.root.x_range.factors = names
result = {
"name": names,
"count": [counts[name] for name in names],
"nbytes": [nbytes[name] for name in names],
"nbytes_text": [format_bytes(nbytes[name]) for name in names],
"color": [color_of(name) for name in names],
}
self.root.title.text = "Total Use: " + format_bytes(sum(nbytes.values()))
update(self.source, result)
class CurrentLoad(DashboardComponent):
"""Tasks and CPU usage on each worker"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"nprocessing": [],
"nprocessing-half": [],
"nprocessing-color": [],
"cpu": [],
"cpu-half": [],
"y": [],
"worker": [],
"escaped_worker": [],
}
)
processing = figure(
title="Tasks Processing",
tools="",
id="bk-nprocessing-plot",
name="processing",
width=int(width / 2),
min_border_bottom=50,
**kwargs,
)
rect = processing.rect(
source=self.source,
x="nprocessing-half",
y="y",
width="nprocessing",
height=0.9,
color="nprocessing-color",
)
processing.x_range.start = 0
rect.nonselection_glyph = None
cpu = figure(
title="CPU Utilization",
tools="",
id="bk-cpu-worker-plot",
width=int(width / 2),
name="cpu_hist",
x_range=(0, 100),
min_border_bottom=50,
**kwargs,
)
rect = cpu.rect(
source=self.source,
x="cpu-half",
y="y",
width="cpu",
height=0.9,
color="blue",
)
rect.nonselection_glyph = None
for fig in (processing, cpu):
fig.xaxis.minor_tick_line_alpha = 0
fig.yaxis.visible = False
fig.ygrid.visible = False
tap = TapTool(
callback=OpenURL(url="./info/worker/@escaped_worker.html")
)
fig.add_tools(tap)
fig.toolbar_location = None
fig.yaxis.visible = False
hover = HoverTool()
hover.tooltips = "@worker : @nprocessing tasks"
hover.point_policy = "follow_mouse"
processing.add_tools(hover)
hover = HoverTool()
hover.tooltips = "@worker : @cpu %"
hover.point_policy = "follow_mouse"
cpu.add_tools(hover)
self.processing_figure = processing
self.cpu_figure = cpu
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
now = time()
if not any(ws.processing for ws in workers) and now < self.last + 1:
return
self.last = now
cpu = [int(ws.metrics["cpu"]) for ws in workers]
nprocessing = [len(ws.processing) for ws in workers]
nprocessing_color = []
for ws in workers:
if ws in self.scheduler.idle:
nprocessing_color.append("red")
elif ws in self.scheduler.saturated:
nprocessing_color.append("green")
else:
nprocessing_color.append("blue")
result = {
"cpu": cpu,
"cpu-half": [c / 2 for c in cpu],
"nprocessing": nprocessing,
"nprocessing-half": [np / 2 for np in nprocessing],
"nprocessing-color": nprocessing_color,
"worker": [ws.address for ws in workers],
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"y": list(range(len(workers))),
}
if self.scheduler.workers:
xrange = max(ws.nthreads or 1 for ws in workers)
else:
xrange = 1
self.cpu_figure.x_range.end = xrange * 100
update(self.source, result)
class StealingTimeSeries(DashboardComponent):
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [time() * 1000, time() * 1000 + 1],
"idle": [0, 0],
"saturated": [0, 0],
}
)
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
self.root = figure(
title="Idle and Saturated Workers Over Time",
x_axis_type="datetime",
tools="",
x_range=x_range,
**kwargs,
)
self.root.line(source=self.source, x="time", y="idle", color="red")
self.root.line(source=self.source, x="time", y="saturated", color="green")
self.root.yaxis.minor_tick_line_color = None
self.root.add_tools(
ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width")
)
@without_property_validation
def update(self):
with log_errors():
result = {
"time": [time() * 1000],
"idle": [len(self.scheduler.idle)],
"saturated": [len(self.scheduler.saturated)],
}
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(result, 10000)
)
else:
self.source.stream(result, 10000)
class StealingEvents(DashboardComponent):
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.steal = scheduler.extensions["stealing"]
self.last = 0
self.source = ColumnDataSource(
{
"time": [time() - 20, time()],
"level": [0, 15],
"color": ["white", "white"],
"duration": [0, 0],
"radius": [1, 1],
"cost_factor": [0, 10],
"count": [1, 1],
}
)
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
self.root = figure(
title="Stealing Events",
x_axis_type="datetime",
tools="",
x_range=x_range,
**kwargs,
)
self.root.circle(
source=self.source,
x="time",
y="level",
color="color",
size="radius",
alpha=0.5,
)
self.root.yaxis.axis_label = "Level"
hover = HoverTool()
hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor"
hover.point_policy = "follow_mouse"
self.root.add_tools(
hover,
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
def convert(self, msgs):
"""Convert a log message to a glyph"""
total_duration = 0
for msg in msgs:
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg
total_duration += duration
try:
color = Viridis11[level]
except (KeyError, IndexError):
color = "black"
radius = math.sqrt(min(total_duration, 10)) * 30 + 2
d = {
"time": time * 1000,
"level": level,
"count": len(msgs),
"color": color,
"duration": total_duration,
"radius": radius,
"cost_factor": self.steal.cost_multipliers[level],
}
return d
@without_property_validation
def update(self):
with log_errors():
log = self.scheduler.get_events(topic="stealing")
current = len(self.scheduler.events["stealing"])
n = current - self.last
log = [log[-i][1] for i in range(1, n + 1) if isinstance(log[-i][1], list)]
self.last = current
if log:
new = pipe(
log,
map(groupby(1)),
map(dict.values),
concat,
map(self.convert),
list,
transpose,
)
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(new, 10000)
)
else:
self.source.stream(new, 10000)
class Events(DashboardComponent):
def __init__(self, scheduler, name, height=150, **kwargs):
self.scheduler = scheduler
self.action_ys = dict()
self.last = 0
self.name = name
self.source = ColumnDataSource(
{"time": [], "action": [], "hover": [], "y": [], "color": []}
)
x_range = DataRange1d(follow="end", follow_interval=200000)
self.root = figure(
title=name,
x_axis_type="datetime",
height=height,
tools="",
x_range=x_range,
**kwargs,
)
self.root.circle(
source=self.source,
x="time",
y="y",
color="color",
size=50,
alpha=0.5,
legend_field="action",
)
self.root.yaxis.axis_label = "Action"
self.root.legend.location = "top_left"
hover = HoverTool()
hover.tooltips = "@action
@hover"
hover.point_policy = "follow_mouse"
self.root.add_tools(
hover,
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
@without_property_validation
def update(self):
with log_errors():
log = self.scheduler.events[self.name]
n = self.scheduler.event_counts[self.name] - self.last
if log:
log = [log[-i] for i in range(1, n + 1)]
self.last = self.scheduler.event_counts[self.name]
if log:
actions = []
times = []
hovers = []
ys = []
colors = []
for msg_time, msg in log:
times.append(msg_time * 1000)
action = msg["action"]
actions.append(action)
try:
ys.append(self.action_ys[action])
except KeyError:
self.action_ys[action] = len(self.action_ys)
ys.append(self.action_ys[action])
colors.append(color_of(action))
hovers.append("TODO")
new = {
"time": times,
"action": actions,
"hover": hovers,
"y": ys,
"color": colors,
}
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(new, 10000)
)
else:
self.source.stream(new, 10000)
class TaskStream(DashboardComponent):
def __init__(self, scheduler, n_rectangles=1000, clear_interval="20s", **kwargs):
self.scheduler = scheduler
self.offset = 0
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
self.plugin = self.scheduler.plugins[TaskStreamPlugin.name]
self.index = max(0, self.plugin.index - n_rectangles)
self.workers = dict()
self.n_rectangles = n_rectangles
clear_interval = parse_timedelta(clear_interval, default="ms")
self.clear_interval = clear_interval
self.last = 0
self.last_seen = 0
self.source, self.root = task_stream_figure(clear_interval, **kwargs)
# Required for update callback
self.task_stream_index = [0]
@without_property_validation
def update(self):
if self.index == self.plugin.index:
return
with log_errors():
if self.index and len(self.source.data["start"]):
start = min(self.source.data["start"])
duration = max(self.source.data["duration"])
boundary = (self.offset + start - duration) / 1000
else:
boundary = self.offset
rectangles = self.plugin.rectangles(
istart=self.index, workers=self.workers, start_boundary=boundary
)
n = len(rectangles["name"])
self.index = self.plugin.index
if not rectangles["start"]:
return
# If it has been a while since we've updated the plot
if time() > self.last_seen + self.clear_interval:
new_start = min(rectangles["start"]) - self.offset
old_start = min(self.source.data["start"])
old_end = max(
map(
operator.add,
self.source.data["start"],
self.source.data["duration"],
)
)
density = (
sum(self.source.data["duration"])
/ len(self.workers)
/ (old_end - old_start)
)
# If whitespace is more than 3x the old width
if (new_start - old_end) > (old_end - old_start) * 2 or density < 0.05:
self.source.data.update({k: [] for k in rectangles}) # clear
self.offset = min(rectangles["start"]) # redefine offset
rectangles["start"] = [x - self.offset for x in rectangles["start"]]
self.last_seen = time()
# Convert to numpy for serialization speed
if n >= 10 and np:
for k, v in rectangles.items():
if isinstance(v[0], Number):
rectangles[k] = np.array(v)
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(rectangles, self.n_rectangles)
)
else:
self.source.stream(rectangles, self.n_rectangles)
def task_stream_figure(clear_interval="20s", **kwargs):
"""
kwargs are applied to the bokeh.models.plots.Plot constructor
"""
clear_interval = parse_timedelta(clear_interval, default="ms")
source = ColumnDataSource(
data=dict(
start=[time() - clear_interval],
duration=[0.1],
key=["start"],
name=["start"],
color=["white"],
duration_text=["100 ms"],
worker=["foo"],
y=[0],
worker_thread=[1],
alpha=[0.0],
)
)
x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
root = figure(
name="task_stream",
title="Task Stream",
id="bk-task-stream-plot",
x_range=x_range,
y_range=y_range,
toolbar_location="above",
x_axis_type="datetime",
y_axis_location=None,
tools="",
min_border_bottom=50,
**kwargs,
)
rect = root.rect(
source=source,
x="start",
y="y",
width="duration",
height=0.4,
fill_color="color",
line_color="color",
line_alpha=0.6,
fill_alpha="alpha",
line_width=3,
)
rect.nonselection_glyph = None
root.yaxis.major_label_text_alpha = 0
root.yaxis.minor_tick_line_alpha = 0
root.yaxis.major_tick_line_alpha = 0
root.xgrid.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
@name:
@duration_text
""",
)
tap = TapTool(callback=OpenURL(url="./profile?key=@name"))
root.add_tools(
hover,
tap,
BoxZoomTool(),
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
if ExportTool:
export = ExportTool()
export.register_plot(root)
root.add_tools(export)
return source, root
class TaskGraph(DashboardComponent):
"""
A dynamic node-link diagram for the task graph on the scheduler
See also the GraphLayout diagnostic at
distributed/diagnostics/graph_layout.py
"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.layout = GraphLayout(scheduler)
scheduler.add_plugin(self.layout)
self.invisible_count = 0 # number of invisible nodes
self.node_source = ColumnDataSource(
{"x": [], "y": [], "name": [], "state": [], "visible": [], "key": []}
)
self.edge_source = ColumnDataSource({"x": [], "y": [], "visible": []})
node_view = CDSView(
filters=[GroupFilter(column_name="visible", group="True")],
)
edge_view = CDSView(
filters=[GroupFilter(column_name="visible", group="True")],
)
# Bokeh >= 3.0 automatically infers the source to use
if BOKEH_VERSION.major < 3:
node_view.source = self.node_source
edge_view.source = self.edge_source
node_colors = factor_cmap(
"state",
factors=["waiting", "processing", "memory", "released", "erred"],
palette=["gray", "green", "red", "blue", "black"],
)
self.root = figure(title="Task Graph", **kwargs)
self.subtitle = Title(text=" ", text_font_style="italic")
self.root.add_layout(self.subtitle, "above")
self.root.multi_line(
xs="x",
ys="y",
source=self.edge_source,
line_width=1,
view=edge_view,
color="black",
alpha=0.3,
)
rect = self.root.square(
x="x",
y="y",
size=10,
color=node_colors,
source=self.node_source,
view=node_view,
legend_field="state",
)
self.root.xgrid.grid_line_color = None
self.root.ygrid.grid_line_color = None
hover = HoverTool(
point_policy="follow_mouse",
tooltips="@name: @state",
renderers=[rect],
)
tap = TapTool(callback=OpenURL(url="info/task/@key.html"), renderers=[rect])
rect.nonselection_glyph = None
self.root.add_tools(hover, tap)
self.max_items = config.get("distributed.dashboard.graph-max-items", 5000)
@without_property_validation
def update(self):
with log_errors():
# If there are too many tasks in the scheduler we'll disable this
# compoonents to not overload scheduler or client. Once we drop
# below the threshold, the data is filled up again as usual
if len(self.scheduler.tasks) > self.max_items:
self.subtitle.text = "Scheduler has too many tasks to display."
for container in [self.node_source, self.edge_source]:
container.data = {col: [] for col in container.column_names}
else:
# occasionally reset the column data source to remove old nodes
if self.invisible_count > len(self.node_source.data["x"]) / 2:
self.layout.reset_index()
self.invisible_count = 0
update = True
else:
update = False
new, self.layout.new = self.layout.new, []
new_edges = self.layout.new_edges
self.layout.new_edges = []
self.add_new_nodes_edges(new, new_edges, update=update)
self.patch_updates()
if len(self.scheduler.tasks) == 0:
self.subtitle.text = "Scheduler is empty."
else:
self.subtitle.text = " "
@without_property_validation
def add_new_nodes_edges(self, new, new_edges, update=False):
if new or update:
node_key = []
node_x = []
node_y = []
node_state = []
node_name = []
edge_x = []
edge_y = []
x = self.layout.x
y = self.layout.y
tasks = self.scheduler.tasks
for key in new:
try:
task = tasks[key]
except KeyError:
continue
xx = x[key]
yy = y[key]
node_key.append(escape.url_escape(key))
node_x.append(xx)
node_y.append(yy)
node_state.append(task.state)
node_name.append(task.prefix.name)
for a, b in new_edges:
try:
edge_x.append([x[a], x[b]])
edge_y.append([y[a], y[b]])
except KeyError:
pass
node = {
"x": node_x,
"y": node_y,
"state": node_state,
"name": node_name,
"key": node_key,
"visible": ["True"] * len(node_x),
}
edge = {"x": edge_x, "y": edge_y, "visible": ["True"] * len(edge_x)}
if update or not len(self.node_source.data["x"]):
# see https://github.com/bokeh/bokeh/issues/7523
self.node_source.data.update(node)
self.edge_source.data.update(edge)
else:
self.node_source.stream(node)
self.edge_source.stream(edge)
@without_property_validation
def patch_updates(self):
"""
Small updates like color changes or lost nodes from task transitions
"""
n = len(self.node_source.data["x"])
m = len(self.edge_source.data["x"])
if self.layout.state_updates:
state_updates = self.layout.state_updates
self.layout.state_updates = []
updates = [(i, c) for i, c in state_updates if i < n]
self.node_source.patch({"state": updates})
if self.layout.visible_updates:
updates = self.layout.visible_updates
updates = [(i, c) for i, c in updates if i < n]
self.layout.visible_updates = []
self.node_source.patch({"visible": updates})
self.invisible_count += len(updates)
if self.layout.visible_edge_updates:
updates = self.layout.visible_edge_updates
updates = [(i, c) for i, c in updates if i < m]
self.layout.visible_edge_updates = []
self.edge_source.patch({"visible": updates})
def __del__(self):
self.scheduler.remove_plugin(name=self.layout.name)
class TaskGroupGraph(DashboardComponent):
"""
Task Group Graph
Creates a graph layout for TaskGroups on the scheduler. It assigns
(x, y) locations to all the TaskGroups and lays them out by according
to their dependencies. The layout gets updated every time that new
TaskGroups are added.
Each task group node incodes information about task progress, memory,
and output type into glyphs, as well as a hover tooltip with more detailed
information on name, computation time, memory, and tasks status.
"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.nodes_layout = {}
self.arrows_layout = {}
self.old_counter = -1
self.nodes_source = ColumnDataSource(
{
"x": [],
"y": [],
"w_box": [],
"h_box": [],
"name": [],
"tot_tasks": [],
"color": [],
"x_start": [],
"x_end": [],
"y_start": [],
"y_end": [],
"x_end_progress": [],
"mem_alpha": [],
"node_line_width": [],
"comp_tasks": [],
"url_logo": [],
"x_logo": [],
"y_logo": [],
"w_logo": [],
"h_logo": [],
"in_processing": [],
"in_memory": [],
"in_released": [],
"in_erred": [],
"compute_time": [],
"memory": [],
}
)
self.arrows_source = ColumnDataSource({"xs": [], "ys": [], "xe": [], "ye": []})
self.root = figure(title="Task Groups Graph", match_aspect=True, **kwargs)
self.root.axis.visible = False
self.subtitle = Title(text=" ", text_font_style="italic")
self.root.add_layout(self.subtitle, "above")
rect = self.root.rect(
x="x",
y="y",
width="w_box",
height="h_box",
color="color",
fill_alpha="mem_alpha",
line_color="black",
line_width="node_line_width",
source=self.nodes_source,
)
# plot tg log
self.root.image_url(
url="url_logo",
x="x_logo",
y="y_logo",
w="w_logo",
h="h_logo",
anchor="center",
source=self.nodes_source,
)
# progress bar plain box
self.root.quad(
left="x_start",
right="x_end",
bottom="y_start",
top="y_end",
color=None,
line_color="black",
source=self.nodes_source,
)
# progress bar
self.root.quad(
left="x_start",
right="x_end_progress",
bottom="y_start",
top="y_end",
color="color",
line_color=None,
fill_alpha=0.6,
source=self.nodes_source,
)
self.arrows = Arrow(
end=VeeHead(size=8),
line_color="black",
line_alpha=0.5,
line_width=1,
x_start="xs",
y_start="ys",
x_end="xe",
y_end="ye",
source=self.arrows_source,
)
self.root.add_layout(self.arrows)
self.root.xgrid.grid_line_color = None
self.root.ygrid.grid_line_color = None
self.root.x_range.range_padding = 0.5
self.root.y_range.range_padding = 0.5
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Name:
@name
Compute time:
@compute_time
Memory:
@memory
Tasks:
@tot_tasks
Completed:
@comp_tasks
Processing:
@in_processing
In memory:
@in_memory
Erred:
@in_erred
Released:
@in_released
""",
renderers=[rect],
)
self.root.add_tools(hover)
@without_property_validation
def update_layout(self):
with log_errors():
# Get dependecies per task group.
# In some cases there are tg that have themselves as dependencies - we remove those.
dependencies = {
k: {ds.name for ds in ts.dependencies if ds.name != k}
for k, ts in self.scheduler.task_groups.items()
}
import dask
order = dask.order.order(
dsk={group.name: 1 for k, group in self.scheduler.task_groups.items()},
dependencies=dependencies,
)
ordered = sorted(self.scheduler.task_groups, key=order.get)
xs = {}
ys = {}
locations = set()
nodes_layout = {}
arrows_layout = {}
for tg in ordered:
if dependencies[tg]:
x = max(xs[dep] for dep in dependencies[tg]) + 1
y = max(ys[dep] for dep in dependencies[tg])
if (
len(dependencies[tg]) > 1
and len({ys[dep] for dep in dependencies[tg]}) == 1
):
y += 1
else:
x = 0
y = max(ys.values()) + 1 if ys else 0
while (x, y) in locations: # avoid collisions by moving up
y += 1
locations.add((x, y))
xs[tg], ys[tg] = x, y
# info neded for node layout to coulmn data source
nodes_layout[tg] = {"x": xs[tg], "y": ys[tg]}
# info needed for arrow layout
arrows_layout[tg] = {
"nstart": dependencies[tg],
"nend": [tg] * len(dependencies[tg]),
}
return nodes_layout, arrows_layout
def compute_size(self, x, min_box, max_box):
start = 0.4
end = 0.8
y = (end - start) / (max_box - min_box) * (x - min_box) + start
return y
@without_property_validation
def update(self):
if self.scheduler.transition_counter == self.old_counter:
return
self.old_counter = self.scheduler.transition_counter
if not self.scheduler.task_groups:
self.subtitle.text = "Scheduler is empty."
else:
self.subtitle.text = " "
if self.nodes_layout.keys() != self.scheduler.task_groups.keys():
self.nodes_layout, self.arrows_layout = self.update_layout()
nodes_data = {
"x": [],
"y": [],
"w_box": [],
"h_box": [],
"name": [],
"color": [],
"tot_tasks": [],
"x_start": [],
"x_end": [],
"y_start": [],
"y_end": [],
"x_end_progress": [],
"mem_alpha": [],
"node_line_width": [],
"comp_tasks": [],
"url_logo": [],
"x_logo": [],
"y_logo": [],
"w_logo": [],
"h_logo": [],
"in_processing": [],
"in_memory": [],
"in_released": [],
"in_erred": [],
"compute_time": [],
"memory": [],
}
arrows_data = {
"xs": [],
"ys": [],
"xe": [],
"ye": [],
}
durations = set()
nbytes = set()
for key, tg in self.scheduler.task_groups.items():
if tg.duration and tg.nbytes_total:
durations.add(tg.duration)
nbytes.add(tg.nbytes_total)
durations_min = min(durations, default=0)
durations_max = max(durations, default=0)
nbytes_min = min(nbytes, default=0)
nbytes_max = max(nbytes, default=0)
box_dim = {}
for key, tg in self.scheduler.task_groups.items():
comp_tasks = (
tg.states["released"] + tg.states["memory"] + tg.states["erred"]
)
tot_tasks = sum(tg.states.values())
# compute width and height of boxes
if (
tg.duration
and tg.nbytes_total
and comp_tasks
and len(durations) > 1
and len(nbytes) > 1
):
# scale duration (width)
width_box = self.compute_size(
tg.duration / comp_tasks * tot_tasks,
min_box=durations_min / comp_tasks * tot_tasks,
max_box=durations_max / comp_tasks * tot_tasks,
)
# need to scale memory (height)
height_box = self.compute_size(
tg.nbytes_total / comp_tasks * tot_tasks,
min_box=nbytes_min / comp_tasks * tot_tasks,
max_box=nbytes_max / comp_tasks * tot_tasks,
)
else:
width_box = 0.6
height_box = width_box / 2
box_dim[key] = {"width": width_box, "height": height_box}
for key, tg in self.scheduler.task_groups.items():
x = self.nodes_layout[key]["x"]
y = self.nodes_layout[key]["y"]
width = box_dim[key]["width"]
height = box_dim[key]["height"]
# main boxes layout
nodes_data["x"].append(x)
nodes_data["y"].append(y)
nodes_data["w_box"].append(width)
nodes_data["h_box"].append(height)
comp_tasks = (
tg.states["released"] + tg.states["memory"] + tg.states["erred"]
)
tot_tasks = sum(tg.states.values())
nodes_data["name"].append(tg.prefix.name)
nodes_data["color"].append(color_of(tg.prefix.name))
nodes_data["tot_tasks"].append(tot_tasks)
# memory alpha factor by 0.4 if not get's too dark
nodes_data["mem_alpha"].append(
(tg.states["memory"] / sum(tg.states.values())) * 0.4
)
# main box line width
if tg.states["processing"]:
nodes_data["node_line_width"].append(5)
else:
nodes_data["node_line_width"].append(1)
# progress bar data update
nodes_data["x_start"].append(x - width / 2)
nodes_data["x_end"].append(x + width / 2)
nodes_data["y_start"].append(y - height / 2)
nodes_data["y_end"].append(y - height / 2 + height * 0.4)
nodes_data["x_end_progress"].append(
x - width / 2 + width * comp_tasks / tot_tasks
)
# arrows
arrows_data["xs"] += [
self.nodes_layout[k]["x"] + box_dim[k]["width"] / 2
for k in self.arrows_layout[key]["nstart"]
]
arrows_data["ys"] += [
self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nstart"]
]
arrows_data["xe"] += [
self.nodes_layout[k]["x"] - box_dim[k]["width"] / 2
for k in self.arrows_layout[key]["nend"]
]
arrows_data["ye"] += [
self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nend"]
]
# LOGOS
if len(tg.types) == 1:
logo_type = next(iter(tg.types)).split(".")[0]
try:
url_logo = logos_dict[logo_type]
except KeyError:
url_logo = ""
else:
url_logo = ""
nodes_data["url_logo"].append(url_logo)
nodes_data["x_logo"].append(x + width / 3)
nodes_data["y_logo"].append(y + height / 3)
ratio = width / height
if ratio > 1:
nodes_data["h_logo"].append(height * 0.3)
nodes_data["w_logo"].append(width * 0.3 / ratio)
else:
nodes_data["h_logo"].append(height * 0.3 * ratio)
nodes_data["w_logo"].append(width * 0.3)
# compute_time and memory
nodes_data["compute_time"].append(format_time(tg.duration))
nodes_data["memory"].append(format_bytes(tg.nbytes_total))
# Add some status to hover
tasks_processing = tg.states["processing"]
tasks_memory = tg.states["memory"]
tasks_relased = tg.states["released"]
tasks_erred = tg.states["erred"]
nodes_data["comp_tasks"].append(
f"{comp_tasks} ({comp_tasks / tot_tasks * 100:.0f} %)"
)
nodes_data["in_processing"].append(
f"{tasks_processing} ({tasks_processing/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_memory"].append(
f"{tasks_memory} ({tasks_memory/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_released"].append(
f"{tasks_relased} ({tasks_relased/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_erred"].append(
f"{ tasks_erred} ({tasks_erred/ tot_tasks * 100:.0f} %)"
)
self.nodes_source.data.update(nodes_data)
self.arrows_source.data.update(arrows_data)
class TaskGroupProgress(DashboardComponent):
"""Stacked area chart showing task groups through time"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.source = ColumnDataSource()
# The length of timeseries to chart (in units of plugin.dt)
self.npts = 180
if GroupTiming.name not in scheduler.plugins:
scheduler.add_plugin(plugin=GroupTiming(scheduler))
self.plugin = scheduler.plugins[GroupTiming.name]
self.source.add(np.array(self.plugin.time) * 1000.0, "time")
x_range = DataRange1d(range_padding=0)
y_range = Range1d(0, max(self.plugin.nthreads))
self.root = figure(
id="bk-task-group-progress-plot",
title="Task Group Progress",
name="task_group_progress",
toolbar_location="above",
min_border_bottom=50,
x_range=x_range,
y_range=y_range,
tools="",
x_axis_type="datetime",
y_axis_location=None,
**kwargs,
)
self.root.yaxis.major_label_text_alpha = 0
self.root.yaxis.minor_tick_line_alpha = 0
self.root.yaxis.major_tick_line_alpha = 0
self.root.xgrid.visible = False
self.root.add_tools(
BoxZoomTool(),
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
self._hover = None
self._last_drawn = None
self._offset = time()
self._last_transition_count = scheduler.transition_counter
# OrderedDict so we can make a reverse iterator later and get the
# most-recently-added glyphs.
self._renderers = OrderedDict()
self._line_renderers = OrderedDict()
def _should_add_new_renderers(self) -> bool:
"""
Whether to add new renderers to the chart.
When a new set of task groups enters the scheduler we'd like to start rendering
them. But it can be expensive to add new glyps, so we do it deliberately,
checking whether we have to do it and whether the scheduler seems busy.
"""
# Always draw if we have not before
if not self._last_drawn:
return True
# Don't draw if there have been no new tasks completed since the last update,
# or if the scheduler CPU is occupied.
if (
self._last_transition_count == self.scheduler.transition_counter
or self.scheduler.proc.cpu_percent() > 50
):
return False
# Only return true if there are new task groups that we have not yet added
# to the ColumnDataSource.
return not set(self.plugin.compute.keys()) <= set(self.source.data.keys())
def _should_update(self) -> bool:
"""
Whether to update the ColumnDataSource. This is cheaper than redrawing,
but still not free, so we check whether we need it and whether the scheudler
is busy.
"""
return (
self._last_transition_count != self.scheduler.transition_counter
and self.scheduler.proc.cpu_percent() < 50
)
def _get_timeseries(self, restrict_to_existing=False):
"""
Update the ColumnDataSource with our time series data.
restrict_to_existing determines whether to add new task groups
which might have been added since the last time we rendered.
This is important as we want to add new stackers very deliberately.
"""
# Get the front/back indices for most recent npts bins out of the timeseries
front = max(len(self.plugin.time) - self.npts, 0)
back = None
# Remove any periods of zero compute at the front or back of the timeseries
if len(self.plugin.compute):
agg = sum([np.array(v[front:]) for v in self.plugin.compute.values()])
front2 = len(agg) - len(np.trim_zeros(agg, trim="f"))
front += front2
back = len(np.trim_zeros(agg, trim="b")) - len(agg) or None
prepend = (
self.plugin.time[front - 1]
if front >= 1
else self.plugin.time[front] - self.plugin.dt
)
timestamps = np.array(self.plugin.time[front:back])
dt = np.diff(timestamps, prepend=prepend)
if restrict_to_existing:
new_data = {
k: np.array(v[front:back]) / dt
for k, v in self.plugin.compute.items()
if k in self.source.data
}
else:
new_data = valmap(
lambda x: np.array(x[front:back]) / dt,
self.plugin.compute,
)
new_data["time"] = (
timestamps - self._offset
) * 1000.0 # bokeh likes milliseconds
new_data["nthreads"] = np.array(self.plugin.nthreads[front:back])
return new_data
@without_property_validation
def update(self):
"""
Maybe update the chart. This is somewhat expensive to draw, so we update
it pretty defensively.
"""
with log_errors():
if self._should_add_new_renderers():
# Update the chart, allowing for new task groups to be added.
new_data = self._get_timeseries(restrict_to_existing=False)
self.source.data = new_data
# Possibly update the y range if the number of threads has increased.
max_nthreads = max(self.plugin.nthreads)
if self.root.y_range.end != max_nthreads:
self.root.y_range.end = max_nthreads
stackers = list(self.plugin.compute.keys())
colors = [color_of(key_split(k)) for k in stackers]
for i, (group, color) in enumerate(zip(stackers, colors)):
# If we have already drawn the group, but it is all zero,
# set it to be invisible.
if group in self._renderers:
if not np.count_nonzero(new_data[group]) > 0:
self._renderers[group].visible = False
self._line_renderers[group].visible = False
else:
self._renderers[group].visible = True
self._line_renderers[group].visible = True
continue
# Draw the new area and line glyphs.
renderer = self.root.varea(
x="time",
y1=stack(*stackers[:i]),
y2=stack(*stackers[: i + 1]),
color=color,
alpha=0.5,
source=self.source,
)
self._renderers[group] = renderer
line_renderer = self.root.line(
x="time",
y=stack(*stackers[: i + 1]),
color=color,
alpha=1.0,
source=self.source,
)
self._line_renderers[group] = line_renderer
# Don't add hover until there is something to show, as bokehjs seems to
# have trouble with custom hovers when there are no renderers.
if self.plugin.compute and self._hover is None:
# Add a hover that will show occupancy for all currently active
# task groups. This is a little tricky, bokeh doesn't (yet) support
# hit tests for stacked area charts: https://github.com/bokeh/bokeh/issues/9182
# Instead, show a single vline hover which lists the currently active task
# groups. A custom formatter in JS-land pulls the relevant data index and
# assembles the tooltip.
formatter = CustomJSHover(code="return '';")
self._hover = HoverTool(
tooltips="""
Worker thread occupancy
$index{custom}
""",
mode="vline",
line_policy="nearest",
attachment="horizontal",
formatters={"$index": formatter},
)
self.root.add_tools(self._hover)
if self._hover:
# Create a custom tooltip that:
# 1. Includes nthreads
# 2. Filters out inactive task groups
# (ones without any compute during the relevant dt)
# 3. Colors the labels appropriately.
formatter = CustomJSHover(
code="""
const colormap = %s;
const divs = [];
for (let k of Object.keys(source.data)) {
const val = source.data[k][value];
const color = colormap[k];
if (k === "time" || k === "nthreads" || val < 1.e-3) {
continue;
}
const label = k.length >= 20 ? k.slice(0, 20) + '…' : k;
// Unshift so that the ordering of the labels is the same as
// the ordering of the stackers.
divs.unshift(
''
+ ''
+ label
+ ''
+ ': '
+ val.toFixed(1)
+ '
'
)
}
divs.unshift(
''
+ 'nthreads: '
+ source.data.nthreads[value]
+ '
'
);
return divs.join('\\n')
"""
% dict(
zip(stackers, colors)
), # sneak the color mapping into the callback
args={"source": self.source},
)
# Add the HoverTool to the top line renderer.
top_line = None
for line in reversed(self._line_renderers.values()):
if line.visible:
top_line = line
break
self._hover.renderers = [top_line]
self._hover.formatters = {"$index": formatter}
self._last_drawn = time()
self._last_transition_count = self.scheduler.transition_counter
elif self._should_update():
# Possibly update the y range if new threads have been added
max_nthreads = max(self.plugin.nthreads)
if self.root.y_range.end != max_nthreads:
self.root.y_range.end = max_nthreads
# Update the data, only including existing columns, rather than redrawing
# the whole chart.
self.source.data = self._get_timeseries(restrict_to_existing=True)
self._last_transition_count = self.scheduler.transition_counter
class TaskProgress(DashboardComponent):
"""Progress bars per task type"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
data = progress_quads(
dict(all={}, memory={}, erred={}, released={}, processing={})
)
self.source = ColumnDataSource(data=data)
x_range = DataRange1d(range_padding=0)
y_range = Range1d(-8, 0)
self.root = figure(
id="bk-task-progress-plot",
title="Progress",
name="task_progress",
x_range=x_range,
y_range=y_range,
toolbar_location=None,
tools="",
min_border_bottom=50,
**kwargs,
)
self.root.line( # just to define early ranges
x=[0, 0.9], y=[-1, 0], line_color="#FFFFFF", alpha=0.0
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="left",
right="right",
fill_color="#aaaaaa",
line_color="#aaaaaa",
fill_alpha=0.1,
line_alpha=0.3,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="left",
right="released-loc",
fill_color="color",
line_color="color",
fill_alpha=0.6,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="released-loc",
right="memory-loc",
fill_color="color",
line_color="color",
fill_alpha=1.0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="memory-loc",
right="erred-loc",
fill_color="black",
fill_alpha=0.5,
line_alpha=0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="erred-loc",
right="processing-loc",
fill_color="gray",
fill_alpha=0.35,
line_alpha=0,
)
self.root.text(
source=self.source,
text="show-name",
y="bottom",
x="left",
x_offset=5,
text_font_size=value("10pt"),
)
self.root.text(
source=self.source,
text="done",
y="bottom",
x="right",
x_offset=-5,
text_align="right",
text_font_size=value("10pt"),
)
self.root.ygrid.visible = False
self.root.yaxis.minor_tick_line_alpha = 0
self.root.yaxis.visible = False
self.root.xgrid.visible = False
self.root.xaxis.minor_tick_line_alpha = 0
self.root.xaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Name:
@name
All:
@all
Memory:
@memory
Erred:
@erred
Ready:
@processing
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
state = {
"memory": {},
"erred": {},
"released": {},
"processing": {},
"waiting": {},
}
for tp in self.scheduler.task_prefixes.values():
active_states = tp.active_states
if any(active_states.get(s) for s in state.keys()):
state["memory"][tp.name] = active_states["memory"]
state["erred"][tp.name] = active_states["erred"]
state["released"][tp.name] = active_states["released"]
state["processing"][tp.name] = active_states["processing"]
state["waiting"][tp.name] = active_states["waiting"]
state["all"] = {
k: sum(v[k] for v in state.values()) for k in state["memory"]
}
if not state["all"] and not len(self.source.data["all"]):
return
d = progress_quads(state)
update(self.source, d)
totals = {
k: sum(state[k].values())
for k in ["all", "memory", "erred", "released", "waiting"]
}
totals["processing"] = totals["all"] - sum(
v for k, v in totals.items() if k != "all"
)
self.root.title.text = (
"Progress -- total: %(all)s, "
"in-memory: %(memory)s, processing: %(processing)s, "
"waiting: %(waiting)s, "
"erred: %(erred)s" % totals
)
class WorkerTable(DashboardComponent):
"""Status of the current workers
This is two plots, a text-based table for each host and a thin horizontal
plot laying out hosts by their current memory use.
"""
excluded_names = {
"executing",
"in_flight",
"in_memory",
"ready",
"time",
"spilled_nbytes",
}
def __init__(self, scheduler, width=800, **kwargs):
self.scheduler = scheduler
self.names = [
"name",
"address",
"nthreads",
"cpu",
"memory",
"memory_limit",
"memory_percent",
"memory_managed_in_memory",
"memory_unmanaged_old",
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
"cpu_fraction",
]
workers = self.scheduler.workers.values()
self.extra_names = sorted(
{
m
for ws in workers
for m, v in ws.metrics.items()
if m not in self.names and isinstance(v, (str, int, float))
}
- self.excluded_names
)
table_names = [
"name",
"address",
"nthreads",
"cpu",
"memory",
"memory_limit",
"memory_percent",
"memory_managed_in_memory",
"memory_unmanaged_old",
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
]
column_title_renames = {
"memory_limit": "limit",
"memory_percent": "memory %",
"memory_managed_in_memory": "managed",
"memory_unmanaged_old": "unmanaged old",
"memory_unmanaged_recent": "unmanaged recent",
"memory_spilled": "spilled",
"num_fds": "# fds",
"read_bytes": "read",
"write_bytes": "write",
}
self.source = ColumnDataSource({k: [] for k in self.names})
columns = {
name: TableColumn(field=name, title=column_title_renames.get(name, name))
for name in table_names
}
formatters = {
"cpu": NumberFormatter(format="0 %"),
"memory_percent": NumberFormatter(format="0.0 %"),
"memory": NumberFormatter(format="0.0 b"),
"memory_limit": NumberFormatter(format="0.0 b"),
"memory_managed_in_memory": NumberFormatter(format="0.0 b"),
"memory_unmanaged_old": NumberFormatter(format="0.0 b"),
"memory_unmanaged_recent": NumberFormatter(format="0.0 b"),
"memory_spilled": NumberFormatter(format="0.0 b"),
"read_bytes": NumberFormatter(format="0 b"),
"write_bytes": NumberFormatter(format="0 b"),
"num_fds": NumberFormatter(format="0"),
"nthreads": NumberFormatter(format="0"),
}
table = DataTable(
source=self.source,
columns=[columns[n] for n in table_names],
reorderable=True,
sortable=True,
width=width,
index_position=None,
)
for name in table_names:
if name in formatters:
table.columns[table_names.index(name)].formatter = formatters[name]
extra_names = ["name", "address"] + self.extra_names
extra_columns = {
name: TableColumn(field=name, title=column_title_renames.get(name, name))
for name in extra_names
}
extra_table = DataTable(
source=self.source,
columns=[extra_columns[n] for n in extra_names],
reorderable=True,
sortable=True,
width=width,
index_position=None,
)
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker (@name):
@memory_percent{0.0 %}
""",
)
mem_plot = figure(
title="Memory Use (%)",
toolbar_location=None,
x_range=(0, 1),
y_range=(-0.1, 0.1),
height=60,
width=width,
tools="",
min_border_right=0,
**kwargs,
)
mem_plot.circle(
source=self.source, x="memory_percent", y=0, size=10, fill_alpha=0.5
)
mem_plot.ygrid.visible = False
mem_plot.yaxis.minor_tick_line_alpha = 0
mem_plot.xaxis.visible = False
mem_plot.yaxis.visible = False
mem_plot.add_tools(hover, BoxSelectTool())
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker (@name):
@cpu_fraction{0 %}
""",
)
cpu_plot = figure(
title="CPU Use (%)",
toolbar_location=None,
x_range=(0, 1),
y_range=(-0.1, 0.1),
height=60,
width=width,
tools="",
min_border_right=0,
**kwargs,
)
cpu_plot.circle(
source=self.source, x="cpu_fraction", y=0, size=10, fill_alpha=0.5
)
cpu_plot.ygrid.visible = False
cpu_plot.yaxis.minor_tick_line_alpha = 0
cpu_plot.xaxis.visible = False
cpu_plot.yaxis.visible = False
cpu_plot.add_tools(hover, BoxSelectTool())
self.cpu_plot = cpu_plot
if "sizing_mode" in kwargs:
sizing_mode = {"sizing_mode": kwargs["sizing_mode"]}
else:
sizing_mode = {}
components = [cpu_plot, mem_plot, table]
if self.extra_names:
components.append(extra_table)
self.root = column(*components, id="bk-worker-table", **sizing_mode)
@without_property_validation
def update(self):
data = {name: [] for name in self.names + self.extra_names}
for i, (addr, ws) in enumerate(
sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name))
):
minfo = ws.memory
for name in self.names + self.extra_names:
data[name].append(ws.metrics.get(name, None))
data["name"][-1] = ws.name if ws.name is not None else i
data["address"][-1] = ws.address
if ws.memory_limit:
data["memory_percent"][-1] = ws.metrics["memory"] / ws.memory_limit
else:
data["memory_percent"][-1] = ""
data["memory_limit"][-1] = ws.memory_limit
data["memory_managed_in_memory"][-1] = minfo.managed_in_memory
data["memory_unmanaged_old"][-1] = minfo.unmanaged_old
data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
data["memory_spilled"][-1] = minfo.managed_spilled
data["cpu"][-1] = ws.metrics["cpu"] / 100.0
data["cpu_fraction"][-1] = ws.metrics["cpu"] / 100.0 / ws.nthreads
data["nthreads"][-1] = ws.nthreads
for name in self.names + self.extra_names:
if name == "name":
data[name].insert(0, f"Total ({len(data[name])})")
continue
try:
if len(self.scheduler.workers) == 0:
total_data = None
elif name == "memory_percent":
total_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
)
total_data = (
(
sum(
ws.metrics["memory"]
for ws in self.scheduler.workers.values()
)
/ total_mem
)
if total_mem
else ""
)
elif name == "cpu":
total_data = (
sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
/ 100
/ len(self.scheduler.workers.values())
)
elif name == "cpu_fraction":
total_data = (
sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
/ 100
/ sum(ws.nthreads for ws in self.scheduler.workers.values())
)
else:
total_data = sum(data[name])
data[name].insert(0, total_data)
except TypeError:
data[name].insert(0, None)
self.source.data.update(data)
class SchedulerLogs:
def __init__(self, scheduler, start=None):
logs = scheduler.get_logs(start=start, timestamps=True)
if not logs:
logs_html = (
'No logs to report
'
)
else:
logs_html = Log(
"\n".join(
"%s - %s"
% (datetime.fromtimestamp(time).strftime("%H:%M:%S.%f"), line)
for time, level, line in logs
)
)._repr_html_()
self.root = Div(
text=logs_html,
style={
"width": "100%",
"height": "100%",
"max-width": "1920px",
"max-height": "1080px",
"padding": "12px",
"border": "1px solid lightgray",
"box-shadow": "inset 1px 0 8px 0 lightgray",
"overflow": "auto",
},
)
def systemmonitor_doc(scheduler, extra, doc):
with log_errors():
sysmon = SystemMonitor(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Scheduler System Monitor"
add_periodic_callback(doc, sysmon, 500)
doc.add_root(sysmon.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def stealing_doc(scheduler, extra, doc):
with log_errors():
occupancy = Occupancy(scheduler)
stealing_ts = StealingTimeSeries(scheduler)
stealing_events = StealingEvents(scheduler)
stealing_events.root.x_range = stealing_ts.root.x_range
doc.title = "Dask: Work Stealing"
add_periodic_callback(doc, occupancy, 500)
add_periodic_callback(doc, stealing_ts, 500)
add_periodic_callback(doc, stealing_events, 500)
doc.add_root(
row(
occupancy.root,
column(
stealing_ts.root,
stealing_events.root,
sizing_mode="stretch_both",
),
)
)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def events_doc(scheduler, extra, doc):
with log_errors():
events = Events(scheduler, "all", height=250)
events.update()
add_periodic_callback(doc, events, 500)
doc.title = "Dask: Scheduler Events"
doc.add_root(column(events.root, sizing_mode="scale_width"))
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def workers_doc(scheduler, extra, doc):
with log_errors():
table = WorkerTable(scheduler)
table.update()
add_periodic_callback(doc, table, 500)
doc.title = "Dask: Workers"
doc.add_root(table.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def tasks_doc(scheduler, extra, doc):
with log_errors():
ts = TaskStream(
scheduler,
n_rectangles=dask.config.get(
"distributed.scheduler.dashboard.tasks.task-stream-length"
),
clear_interval="60s",
sizing_mode="stretch_both",
)
ts.update()
add_periodic_callback(doc, ts, 5000)
doc.title = "Dask: Task Stream"
doc.add_root(ts.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def graph_doc(scheduler, extra, doc):
with log_errors():
graph = TaskGraph(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Task Graph"
graph.update()
add_periodic_callback(doc, graph, 200)
doc.add_root(graph.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def tg_graph_doc(scheduler, extra, doc):
with log_errors():
tg_graph = TaskGroupGraph(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Task Groups Graph"
tg_graph.update()
add_periodic_callback(doc, tg_graph, 200)
doc.add_root(tg_graph.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def status_doc(scheduler, extra, doc):
with log_errors():
cluster_memory = ClusterMemory(scheduler, sizing_mode="stretch_both")
cluster_memory.update()
add_periodic_callback(doc, cluster_memory, 100)
doc.add_root(cluster_memory.root)
if len(scheduler.workers) <= 100:
workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both")
processing = CurrentLoad(scheduler, sizing_mode="stretch_both")
processing_root = processing.processing_figure
else:
workers_memory = WorkersMemoryHistogram(
scheduler, sizing_mode="stretch_both"
)
processing = ProcessingHistogram(scheduler, sizing_mode="stretch_both")
processing_root = processing.root
current_load = CurrentLoad(scheduler, sizing_mode="stretch_both")
occupancy = Occupancy(scheduler, sizing_mode="stretch_both")
cpu_root = current_load.cpu_figure
occupancy_root = occupancy.root
workers_memory.update()
processing.update()
current_load.update()
occupancy.update()
add_periodic_callback(doc, workers_memory, 100)
add_periodic_callback(doc, processing, 100)
add_periodic_callback(doc, current_load, 100)
add_periodic_callback(doc, occupancy, 100)
doc.add_root(workers_memory.root)
tab1 = Panel(child=processing_root, title="Processing")
tab2 = Panel(child=cpu_root, title="CPU")
tab3 = Panel(child=occupancy_root, title="Occupancy")
proc_tabs = Tabs(tabs=[tab1, tab2, tab3], name="processing_tabs")
doc.add_root(proc_tabs)
task_stream = TaskStream(
scheduler,
n_rectangles=dask.config.get(
"distributed.scheduler.dashboard.status.task-stream-length"
),
clear_interval="5s",
sizing_mode="stretch_both",
)
task_stream.update()
add_periodic_callback(doc, task_stream, 100)
doc.add_root(task_stream.root)
task_progress = TaskProgress(scheduler, sizing_mode="stretch_both")
task_progress.update()
add_periodic_callback(doc, task_progress, 100)
doc.add_root(task_progress.root)
doc.title = "Dask: Status"
doc.theme = BOKEH_THEME
doc.template = env.get_template("status.html")
doc.template_variables.update(extra)
@curry
def individual_doc(cls, interval, scheduler, extra, doc, fig_attr="root", **kwargs):
with log_errors():
fig = cls(scheduler, sizing_mode="stretch_both", **kwargs)
fig.update()
add_periodic_callback(doc, fig, interval)
doc.add_root(getattr(fig, fig_attr))
doc.theme = BOKEH_THEME
def individual_profile_doc(scheduler, extra, doc):
with log_errors():
prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
prof.trigger_update()
doc.theme = BOKEH_THEME
def individual_profile_server_doc(scheduler, extra, doc):
with log_errors():
prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
prof.trigger_update()
doc.theme = BOKEH_THEME
def profile_doc(scheduler, extra, doc):
with log_errors():
doc.title = "Dask: Profile"
prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
prof.trigger_update()
def profile_server_doc(scheduler, extra, doc):
with log_errors():
doc.title = "Dask: Profile of Event Loop"
prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
prof.trigger_update()