Skip to content

Handle PONG messages before closing connection due to heartbeat timeout #10544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/10544.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Check for PONG before closing connection for missing PONG -- by :user:`mstegmaier`.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ Matvey Tingaev
Meet Mangukiya
Meshya
Michael Ihnatenko
Michael Stegmaier
Michał Górny
Mikhail Burshteyn
Mikhail Kashkin
Expand Down
57 changes: 54 additions & 3 deletions aiohttp/web_ws.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear this would have a noticeable benefit to the user. They are likely to still get the same result regardless, this just seems to catch a (probably rare) edge case that might reduce the frequency of connection closes very slightly.

Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class WebSocketResponse(StreamResponse):
_heartbeat_cb: Optional[asyncio.TimerHandle] = None
_pong_response_cb: Optional[asyncio.TimerHandle] = None
_ping_task: Optional[asyncio.Task[None]] = None
_pong_not_received_task: Optional[asyncio.Task[None]] = None

def __init__(
self,
Expand Down Expand Up @@ -185,12 +186,62 @@ def _ping_task_done(self, task: "asyncio.Task[None]") -> None:
self._ping_task = None

def _pong_not_received(self) -> None:
"""Callback for when no PONG was received after self._pong_heartbeat seconds"""
if self._req is not None and self._req.transport is not None:
self._handle_ping_pong_exception(
asyncio.TimeoutError(
f"No PONG received after {self._pong_heartbeat} seconds"
loop = self._loop
if loop is not None:
pong_not_received_task = loop.create_task(
self._pong_not_received_coro()
)
if not pong_not_received_task.done():
self._pong_not_received_task = pong_not_received_task
pong_not_received_task.add_done_callback(
self._pong_not_received_done
)
else:
self._pong_not_received_done(pong_not_received_task)
else:
self._handle_ping_pong_exception(
asyncio.TimeoutError(
f"No PONG received after {self._pong_heartbeat} seconds"
)
)

async def _pong_not_received_coro(self) -> None:
"""Coroutine to check for pending PONG when no PONG was received after self._pong_heartbeat seconds"""
reader = self._reader
assert reader is not None
try:
async with async_timeout.timeout(self._pong_heartbeat / 10.0):
msg = await reader.read()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a user to catch the exception or anything and avoid the connection close? If it is, then this would break their code, as we've now read a message that they should be processing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same point as #10544 (review)

self._reset_heartbeat()
Comment on lines +217 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is adding a race condition where we would violate the concurrent calls to reading requirement:

raise RuntimeError("Concurrent call to receive() is not allowed")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be solved by the latest commit.

if msg.type is not WSMsgType.PONG:
ws_logger.warning(
f"Received {msg} while waiting for PONG. It seems like you haven't called `receive` within {self._pong_heartbeat} seconds."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't exactly accurate. The user's code could be reading a message with receive() every second, but if there are more than pong_heartbeat messages between PONGs, then it would still get tripped up here.

)
return
except asyncio.TimeoutError: # We still did not receive a PONG
pass
except (
AssertionError
): # In the test, an AssertionError seems to occur before the timeout
pass
except Exception as exc:
self._exception = exc
self._set_closing(WSCloseCode.ABNORMAL_CLOSURE)
await self.close()
return
self._handle_ping_pong_exception(
asyncio.TimeoutError(
f"No PONG received after {self._pong_heartbeat} seconds"
)
)

def _pong_not_received_done(self, task: "asyncio.Task[None]") -> None:
"""Callback for when the pong not received task completes."""
if not task.cancelled() and (exc := task.exception()):
self._handle_ping_pong_exception(exc)
self._pong_not_received_task = None

def _handle_ping_pong_exception(self, exc: BaseException) -> None:
"""Handle exceptions raised during ping/pong processing."""
Expand Down
Loading