-
-
Notifications
You must be signed in to change notification settings - Fork 730
Fix batchedsend restart #6272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix batchedsend restart #6272
Conversation
Unit Test Results 16 files ± 0 16 suites ±0 7h 17m 26s ⏱️ - 16m 6s For more details on these failures, see this check. Results for commit c93894f. ± Comparison against base commit 2286896. ♻️ This comment has been updated with latest results. |
Locally it looks like the failing cases with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR will be an improvement, as a hotfix for a current deadlock #6228.
However, it introduces a new potential bug. (And BatchedSend already has many existing bugs and foot-guns, enumerated in my review on #5457.)
Are we okay treating this as a hotfix and leaving this multiple-start bug for future work, or do we want to fix it here?
|
||
def start(self, comm): | ||
self.please_stop = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the BatchedSend isn't fully stopped, it's possible for restarting to create a second _background_send
coroutine racing against the first. I would imagine this is a problem, not certain though.
- If you call
BatchedSend.abort()
and thenBatchedSend.start()
without a (long enough)await
in between - If you call
BatchedSend.start()
and theBatchedSend
has just aborted, but the_background_send
coroutine is currently waiting onself.waker.wait
orself.comm.write
. - If you just (accidentally) call
BatchedSend.start()
multiple times in a row when it hasn't been stopped already.
Again, this is the sort of thing I'd like to be checking for and validating against. Even if we don't currently have code that can trigger this error condition, it's all too easy for future maintainers to use the API in ways we don't currently expect, and then subtly break things in ways that are hard to debug and cause significant pain for users (just like this issue was). See #5481 (comment).
I don't think we can validate this properly without having a handle on the _background_send
coroutine to check whether it's still running. We also need to decide what the desired behavior is if you call start()
while the coroutine is still running and BatchedSend
isn't closed yet, or if it's currently trying to close. Do we just error? Or do we need start()
to be idempotent as long as it's passed the same comm
each time?
yield self.stopped.wait(timeout=timeout) | ||
if not self.comm.closed(): | ||
if self._background_task: | ||
await self._background_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the background task failed, we'll raise an exception here instead of trying to flush the buffer. I assume that's okay, but should be documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
background task doesn't raise any exceptions
This currently deadlocks
14ac9f1
to
86a36d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple race conditions I'd like to fix. I find WorkerStatusPlugin
weird, but can't decide if it's problematic enough to justify delaying this PR if everything else is otherwise fixed.
Also, can you remove this now-irrelevant block:
distributed/distributed/worker.py
Lines 3429 to 3431 in 2286896
except CommClosedError: | |
# Batched stream send might raise if it was already closed | |
pass |
# To propagate exceptions, we rely on subsequent `BatchedSend.send` | ||
# calls to raise CommClosedErrors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# To propagate exceptions, we rely on subsequent `BatchedSend.send` | |
# calls to raise CommClosedErrors. | |
# The exception will not be propagated. Instead, users of `BatchedSend` are expected | |
# to be implementing explicit reconnection logic when the comm closes. Reconnection often | |
# involves application logic reconciling state (because messages buffered on the | |
# `BatchedSend` may have been lost), then calling `start` again with a new comm object. |
This is no longer true.
@@ -125,50 +135,43 @@ def _background_send(self): | |||
# This means that any messages in our buffer our lost. | |||
# To propagate exceptions, we rely on subsequent `BatchedSend.send` | |||
# calls to raise CommClosedErrors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, are we confident nothing was relying on this behavior? Some places where we might be:
Probably important:
distributed/distributed/scheduler.py
Lines 5002 to 5016 in 2286896
def worker_send(self, worker, msg): | |
"""Send message to worker | |
This also handles connection failures by adding a callback to remove | |
the worker on the next cycle. | |
""" | |
stream_comms: dict = self.stream_comms | |
try: | |
stream_comms[worker].send(msg) | |
except (CommClosedError, AttributeError): | |
self.loop.add_callback( | |
self.remove_worker, | |
address=worker, | |
stimulus_id=f"worker-send-comm-fail-{time()}", | |
) |
distributed/distributed/scheduler.py
Lines 5053 to 5065 in 2286896
for worker, msgs in worker_msgs.items(): | |
try: | |
w = stream_comms[worker] | |
w.send(*msgs) | |
except KeyError: | |
# worker already gone | |
pass | |
except (CommClosedError, AttributeError): | |
self.loop.add_callback( | |
self.remove_worker, | |
address=worker, | |
stimulus_id=f"send-all-comm-fail-{time()}", | |
) |
distributed/distributed/pubsub.py
Lines 104 to 109 in 2286896
try: | |
self.scheduler.client_comms[c].send( | |
{"op": "pubsub-msg", "name": name, "msg": msg} | |
) | |
except (KeyError, CommClosedError): | |
self.remove_subscriber(name=name, client=c) |
Less important:
distributed/distributed/scheduler.py
Lines 4647 to 4654 in 2286896
try: | |
c.send(msg) | |
# logger.debug("Scheduler sends message to client %s", msg) | |
except CommClosedError: | |
if self.status == Status.running: | |
logger.critical( | |
"Closed comm %r while trying to write %s", c, msg, exc_info=True | |
) |
distributed/distributed/scheduler.py
Lines 5018 to 5030 in 2286896
def client_send(self, client, msg): | |
"""Send message to client""" | |
client_comms: dict = self.client_comms | |
c = client_comms.get(client) | |
if c is None: | |
return | |
try: | |
c.send(msg) | |
except CommClosedError: | |
if self.status == Status.running: | |
logger.critical( | |
"Closed comm %r while trying to write %s", c, msg, exc_info=True | |
) |
distributed/distributed/scheduler.py
Lines 5042 to 5051 in 2286896
try: | |
c.send(*msgs) | |
except CommClosedError: | |
if self.status == Status.running: | |
logger.critical( | |
"Closed comm %r while trying to write %s", | |
c, | |
msgs, | |
exc_info=True, | |
) |
distributed/distributed/scheduler.py
Lines 7710 to 7717 in 2286896
def add_worker(self, worker=None, **kwargs): | |
ident = self.scheduler.workers[worker].identity() | |
del ident["metrics"] | |
del ident["last_seen"] | |
try: | |
self.bcomm.send(["add", {"workers": {worker: ident}}]) | |
except CommClosedError: | |
self.scheduler.remove_plugin(name=self.name) |
distributed/distributed/stealing.py
Lines 257 to 259 in 2286896
except CommClosedError: | |
logger.info("Worker comm %r closed while stealing: %r", victim, ts) | |
return "comm-closed" |
I have a feeling that removing these "probably important" code paths is actually a good thing (besides PubSub). It centralizes the logic for handling worker disconnects into handle_worker
. If any of these places were previously raising CommClosedError
, handle_worker
should already have been calling remove_worker
in its finally
statement. Now we're not duplicating that.
@@ -72,13 +75,15 @@ def __repr__(self): | |||
|
|||
__str__ = __repr__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update __repr__
to include self.name
?
|
||
def start(self, comm): | ||
if self._background_task and not self._background_task.done(): | ||
raise RuntimeError("Background task still running") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise RuntimeError("Background task still running") | |
raise RuntimeError("Background task still running for {self!r}") |
for comm in self.client_comms.values(): | ||
comm.abort() | ||
await comm.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an asyncio.gather
instead of serial?
def teardown(self): | ||
self.bcomm.close() | ||
async def close(self) -> None: | ||
await self.bcomm.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe teardown
was never called before (it's not part of the SchedulerPlugin
interface).
bcomm.close()
is going to close the underlying comm object. I believe that's a comm to a client? Is that something WorkerStatusPlugin
should actually be closing unilaterally, or should add_client
for example be responsible for that here
distributed/distributed/scheduler.py
Lines 4693 to 4701 in 2286896
finally: | |
if not comm.closed(): | |
self.client_comms[client].send({"op": "stream-closed"}) | |
try: | |
if not sys.is_finalizing(): | |
await self.client_comms[client].close() | |
del self.client_comms[client] | |
if self.status == Status.running: | |
logger.info("Close client connection: %s", client) |
Just want to point out that you're exposing a previously-dead codepath, which has some questionable behavior regardless of your changes in this PR.
I think it'll be okay as things stand right now (only because close
won't be called in practice until the cluster is shutting down), but it feels a little weird. I don't get the sense that multiple BatchedSend
s are meant to be multiplexed onto a single comm object, which is what WorkerStatusPlugin
is doing. Multiplexing the send
side is probably okay, it's just odd to multiplex the close
side.
@@ -2448,7 +2444,8 @@ def fast_on_a(lock): | |||
|
|||
await s.stream_comms[a.address].close() | |||
|
|||
assert len(s.workers) == 1 | |||
while len(s.workers) == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while len(s.workers) == 1: | |
while len(s.workers) != 1: |
Is this what you meant? I feel like we do want to confirm that from the scheduler's perspective, the worker is gone at this point. Since a.heartbeat_active
is True, the worker shouldn't try to reconnect to the scheduler yet. So I'm surprised len(s.workers)
ever becomes >1 right now.
@@ -201,8 +200,7 @@ def wrapper(self, *args, **kwargs): | |||
}, | |||
) | |||
logger.exception(e) | |||
else: | |||
self.loop.add_callback(_force_close, self) | |||
self.loop.add_callback(_force_close, self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xref: also handled in #6269
("", 1), | ||
], | ||
) | ||
async def test_worker_batched_send_broken( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xref #6268
@@ -2399,10 +2399,6 @@ async def test_hold_on_to_replicas(c, s, *workers): | |||
await asyncio.sleep(0.01) | |||
|
|||
|
|||
@pytest.mark.xfail( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting if this actually fixes these flaky tests! I'm not surprised they were failing before.
Supersedes #6266