Skip to content

Commit bc3c891

Browse files
authored
Shuffle Service with Scheduler Logic (#6007)
1 parent f779854 commit bc3c891

20 files changed

+2002
-523
lines changed

.pre-commit-config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ repos:
4949
- dask
5050
- tornado
5151
- zict
52+
- pyarrow

continuous_integration/environment-3.10.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies:
2424
- pre-commit
2525
- prometheus_client
2626
- psutil
27+
- pyarrow=7
2728
- pytest
2829
- pytest-cov
2930
- pytest-faulthandler

continuous_integration/environment-3.8.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies:
2525
- pre-commit
2626
- prometheus_client
2727
- psutil
28+
- pyarrow=7
2829
- pynvml # Only tested here
2930
- pytest
3031
- pytest-cov

continuous_integration/environment-3.9.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ dependencies:
2626
- pre-commit
2727
- prometheus_client
2828
- psutil
29+
- pyarrow=7
30+
- pynvml # Only tested here
2931
- pytest
3032
- pytest-cov
3133
- pytest-faulthandler

distributed/dashboard/components/scheduler.py

+294-2
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ class SystemTimeseries(DashboardComponent):
10321032
"""
10331033

10341034
@log_errors
1035-
def __init__(self, scheduler, **kwargs):
1035+
def __init__(self, scheduler, follow_interval=20000, **kwargs):
10361036
self.scheduler = scheduler
10371037
self.source = ColumnDataSource(
10381038
{
@@ -1048,7 +1048,9 @@ def __init__(self, scheduler, **kwargs):
10481048

10491049
update(self.source, self.get_data())
10501050

1051-
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
1051+
x_range = DataRange1d(
1052+
follow="end", follow_interval=follow_interval, range_padding=0
1053+
)
10521054
tools = "reset, xpan, xwheel_zoom"
10531055

10541056
self.bandwidth = figure(
@@ -3465,6 +3467,261 @@ def update(self):
34653467
self.source.data.update(data)
34663468

34673469

3470+
class Shuffling(DashboardComponent):
3471+
"""Occupancy (in time) per worker"""
3472+
3473+
def __init__(self, scheduler, **kwargs):
3474+
with log_errors():
3475+
self.scheduler = scheduler
3476+
self.source = ColumnDataSource(
3477+
{
3478+
"worker": [],
3479+
"y": [],
3480+
"comm_memory": [],
3481+
"comm_memory_limit": [],
3482+
"comm_buckets": [],
3483+
"comm_active": [],
3484+
"comm_avg_duration": [],
3485+
"comm_avg_size": [],
3486+
"comm_read": [],
3487+
"comm_written": [],
3488+
"comm_color": [],
3489+
"disk_memory": [],
3490+
"disk_memory_limit": [],
3491+
"disk_buckets": [],
3492+
"disk_active": [],
3493+
"disk_avg_duration": [],
3494+
"disk_avg_size": [],
3495+
"disk_read": [],
3496+
"disk_written": [],
3497+
"disk_color": [],
3498+
}
3499+
)
3500+
self.totals_source = ColumnDataSource(
3501+
{
3502+
"x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"],
3503+
"values": [0, 0, 0, 0],
3504+
}
3505+
)
3506+
3507+
self.comm_memory = figure(
3508+
title="Comms Buffer",
3509+
tools="",
3510+
toolbar_location="above",
3511+
x_range=Range1d(0, 100_000_000),
3512+
**kwargs,
3513+
)
3514+
self.comm_memory.hbar(
3515+
source=self.source,
3516+
right="comm_memory",
3517+
y="y",
3518+
height=0.9,
3519+
color="comm_color",
3520+
)
3521+
hover = HoverTool(
3522+
tooltips=[
3523+
("Memory Used", "@comm_memory{0.00 b}"),
3524+
("Average Write", "@comm_avg_size{0.00 b}"),
3525+
("# Buckets", "@comm_buckets"),
3526+
("Average Duration", "@comm_avg_duration"),
3527+
],
3528+
formatters={"@comm_avg_duration": "datetime"},
3529+
mode="hline",
3530+
)
3531+
self.comm_memory.add_tools(hover)
3532+
self.comm_memory.x_range.start = 0
3533+
self.comm_memory.x_range.end = 1
3534+
self.comm_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
3535+
3536+
self.disk_memory = figure(
3537+
title="Disk Buffer",
3538+
tools="",
3539+
toolbar_location="above",
3540+
x_range=Range1d(0, 100_000_000),
3541+
**kwargs,
3542+
)
3543+
self.disk_memory.yaxis.visible = False
3544+
3545+
self.disk_memory.hbar(
3546+
source=self.source,
3547+
right="disk_memory",
3548+
y="y",
3549+
height=0.9,
3550+
color="disk_color",
3551+
)
3552+
3553+
hover = HoverTool(
3554+
tooltips=[
3555+
("Memory Used", "@disk_memory{0.00 b}"),
3556+
("Average Write", "@disk_avg_size{0.00 b}"),
3557+
("# Buckets", "@disk_buckets"),
3558+
("Average Duration", "@disk_avg_duration"),
3559+
],
3560+
formatters={"@disk_avg_duration": "datetime"},
3561+
mode="hline",
3562+
)
3563+
self.disk_memory.add_tools(hover)
3564+
self.disk_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
3565+
3566+
self.totals = figure(
3567+
title="Total movement",
3568+
tools="",
3569+
toolbar_location="above",
3570+
**kwargs,
3571+
)
3572+
titles = ["Network Send", "Network Receive", "Disk Write", "Disk Read"]
3573+
self.totals = figure(
3574+
x_range=titles,
3575+
title="Totals",
3576+
toolbar_location=None,
3577+
tools="",
3578+
**kwargs,
3579+
)
3580+
3581+
self.totals.vbar(
3582+
x="x",
3583+
top="values",
3584+
width=0.9,
3585+
source=self.totals_source,
3586+
)
3587+
3588+
self.totals.xgrid.grid_line_color = None
3589+
self.totals.y_range.start = 0
3590+
self.totals.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
3591+
3592+
hover = HoverTool(
3593+
tooltips=[("Total", "@values{0.00b}")],
3594+
mode="vline",
3595+
)
3596+
self.totals.add_tools(hover)
3597+
3598+
self.root = row(self.comm_memory, self.disk_memory)
3599+
3600+
@without_property_validation
3601+
def update(self):
3602+
with log_errors():
3603+
input = self.scheduler.extensions["shuffle"].heartbeats
3604+
if not input:
3605+
return
3606+
3607+
input = list(input.values())[-1] # TODO: multiple concurrent shuffles
3608+
3609+
data = {
3610+
"worker": [],
3611+
"y": [],
3612+
"comm_memory": [],
3613+
"comm_memory_limit": [],
3614+
"comm_buckets": [],
3615+
"comm_active": [],
3616+
"comm_avg_duration": [],
3617+
"comm_avg_size": [],
3618+
"comm_read": [],
3619+
"comm_written": [],
3620+
"comm_color": [],
3621+
"disk_memory": [],
3622+
"disk_memory_limit": [],
3623+
"disk_buckets": [],
3624+
"disk_active": [],
3625+
"disk_avg_duration": [],
3626+
"disk_avg_size": [],
3627+
"disk_read": [],
3628+
"disk_written": [],
3629+
"disk_color": [],
3630+
}
3631+
now = time()
3632+
3633+
for i, (worker, d) in enumerate(input.items()):
3634+
data["y"].append(i)
3635+
data["worker"].append(worker)
3636+
data["comm_memory"].append(d["comms"]["memory"])
3637+
data["comm_memory_limit"].append(d["comms"]["memory_limit"])
3638+
data["comm_buckets"].append(d["comms"]["buckets"])
3639+
data["comm_active"].append(d["comms"]["active"])
3640+
data["comm_avg_duration"].append(
3641+
d["comms"]["diagnostics"].get("avg_duration", 0)
3642+
)
3643+
data["comm_avg_size"].append(
3644+
d["comms"]["diagnostics"].get("avg_size", 0)
3645+
)
3646+
data["comm_read"].append(d["comms"]["read"])
3647+
data["comm_written"].append(d["comms"]["written"])
3648+
try:
3649+
if self.scheduler.workers[worker].last_seen < now - 5:
3650+
data["comm_color"].append("gray")
3651+
elif d["comms"]["active"]:
3652+
data["comm_color"].append("green")
3653+
elif d["comms"]["memory"] > d["comms"]["memory_limit"]:
3654+
data["comm_color"].append("red")
3655+
else:
3656+
data["comm_color"].append("blue")
3657+
except KeyError:
3658+
data["comm_color"].append("black")
3659+
3660+
data["disk_memory"].append(d["disk"]["memory"])
3661+
data["disk_memory_limit"].append(d["disk"]["memory_limit"])
3662+
data["disk_buckets"].append(d["disk"]["buckets"])
3663+
data["disk_active"].append(d["disk"]["active"])
3664+
data["disk_avg_duration"].append(
3665+
d["disk"]["diagnostics"].get("avg_duration", 0)
3666+
)
3667+
data["disk_avg_size"].append(
3668+
d["disk"]["diagnostics"].get("avg_size", 0)
3669+
)
3670+
data["disk_read"].append(d["disk"]["read"])
3671+
data["disk_written"].append(d["disk"]["written"])
3672+
try:
3673+
if self.scheduler.workers[worker].last_seen < now - 5:
3674+
data["disk_color"].append("gray")
3675+
elif d["disk"]["active"]:
3676+
data["disk_color"].append("green")
3677+
elif d["disk"]["memory"] > d["disk"]["memory_limit"]:
3678+
data["disk_color"].append("red")
3679+
else:
3680+
data["disk_color"].append("blue")
3681+
except KeyError:
3682+
data["disk_color"].append("black")
3683+
3684+
"""
3685+
singletons = {
3686+
"comm_avg_duration": [
3687+
sum(data["comm_avg_duration"]) / len(data["comm_avg_duration"])
3688+
],
3689+
"comm_avg_size": [
3690+
sum(data["comm_avg_size"]) / len(data["comm_avg_size"])
3691+
],
3692+
"disk_avg_duration": [
3693+
sum(data["disk_avg_duration"]) / len(data["disk_avg_duration"])
3694+
],
3695+
"disk_avg_size": [
3696+
sum(data["disk_avg_size"]) / len(data["disk_avg_size"])
3697+
],
3698+
}
3699+
singletons["comm_avg_bandwidth"] = [
3700+
singletons["comm_avg_size"][0] / singletons["comm_avg_duration"][0]
3701+
]
3702+
singletons["disk_avg_bandwidth"] = [
3703+
singletons["disk_avg_size"][0] / singletons["disk_avg_duration"][0]
3704+
]
3705+
singletons["y"] = [data["y"][-1] / 2]
3706+
"""
3707+
3708+
totals = {
3709+
"x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"],
3710+
"values": [
3711+
sum(data["comm_written"]),
3712+
sum(data["comm_read"]),
3713+
sum(data["disk_written"]),
3714+
sum(data["disk_read"]),
3715+
],
3716+
}
3717+
update(self.totals_source, totals)
3718+
3719+
update(self.source, data)
3720+
limit = max(data["comm_memory_limit"] + data["disk_memory_limit"]) * 1.2
3721+
self.comm_memory.x_range.end = limit
3722+
self.disk_memory.x_range.end = limit
3723+
3724+
34683725
class SchedulerLogs:
34693726
def __init__(self, scheduler, start=None):
34703727
logs = scheduler.get_logs(start=start, timestamps=True)
@@ -3509,6 +3766,41 @@ def systemmonitor_doc(scheduler, extra, doc):
35093766
doc.theme = BOKEH_THEME
35103767

35113768

3769+
@log_errors
3770+
def shuffling_doc(scheduler, extra, doc):
3771+
doc.title = "Dask: Shuffling"
3772+
3773+
shuffling = Shuffling(scheduler, width=400, height=400)
3774+
workers_memory = WorkersMemory(scheduler, width=400, height=400)
3775+
timeseries = SystemTimeseries(
3776+
scheduler, width=1600, height=200, follow_interval=3000
3777+
)
3778+
event_loop = EventLoop(scheduler, width=200, height=400)
3779+
3780+
add_periodic_callback(doc, shuffling, 200)
3781+
add_periodic_callback(doc, workers_memory, 200)
3782+
add_periodic_callback(doc, timeseries, 500)
3783+
add_periodic_callback(doc, event_loop, 500)
3784+
3785+
timeseries.bandwidth.y_range = timeseries.disk.y_range
3786+
3787+
doc.add_root(
3788+
column(
3789+
row(
3790+
workers_memory.root,
3791+
shuffling.comm_memory,
3792+
shuffling.disk_memory,
3793+
shuffling.totals,
3794+
event_loop.root,
3795+
),
3796+
row(column(timeseries.bandwidth, timeseries.disk)),
3797+
)
3798+
)
3799+
doc.template = env.get_template("simple.html")
3800+
doc.template_variables.update(extra)
3801+
doc.theme = BOKEH_THEME
3802+
3803+
35123804
@log_errors
35133805
def stealing_doc(scheduler, extra, doc):
35143806
occupancy = Occupancy(scheduler)

distributed/dashboard/scheduler.py

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
individual_profile_server_doc,
3838
profile_doc,
3939
profile_server_doc,
40+
shuffling_doc,
4041
status_doc,
4142
stealing_doc,
4243
systemmonitor_doc,
@@ -49,6 +50,7 @@
4950

5051
applications = {
5152
"/system": systemmonitor_doc,
53+
"/shuffle": shuffling_doc,
5254
"/stealing": stealing_doc,
5355
"/workers": workers_doc,
5456
"/events": events_doc,

distributed/dashboard/tests/test_scheduler_bokeh.py

+16
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Occupancy,
3131
ProcessingHistogram,
3232
ProfileServer,
33+
Shuffling,
3334
StealingEvents,
3435
StealingTimeSeries,
3536
SystemMonitor,
@@ -1007,6 +1008,21 @@ async def test_prefix_bokeh(s, a, b):
10071008
assert bokeh_app.prefix == f"/{prefix}"
10081009

10091010

1011+
@gen_cluster(client=True, worker_kwargs={"dashboard": True})
1012+
async def test_shuffling(c, s, a, b):
1013+
dd = pytest.importorskip("dask.dataframe")
1014+
ss = Shuffling(s)
1015+
1016+
df = dask.datasets.timeseries()
1017+
df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()
1018+
1019+
start = time()
1020+
while not ss.source.data["disk_read"]:
1021+
ss.update()
1022+
await asyncio.sleep(0.1)
1023+
assert time() < start + 5
1024+
1025+
10101026
@gen_cluster(client=True, nthreads=[], scheduler_kwargs={"dashboard": True})
10111027
async def test_hardware(c, s):
10121028
plot = Hardware(s)

0 commit comments

Comments
 (0)