Skip to content

Commit

Permalink
Fix redis room pubsub shutdown (#892)
Browse files Browse the repository at this point in the history
task.exception will throw if the task was cancelled, so we have to handle that exception differently
  • Loading branch information
sburba committed Dec 15, 2022
1 parent 8678927 commit c2e6e73
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
17 changes: 13 additions & 4 deletions api/src/room_store/redis_room_listener.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import contextlib
import json
from asyncio import Future, Task
from asyncio import Future, Task, CancelledError
from collections import defaultdict
from dataclasses import dataclass, asdict
from json import JSONDecodeError
Expand Down Expand Up @@ -63,9 +63,18 @@ async def _keep_connection_alive(self) -> NoReturn:
await self._pubsub.ping()

def _on_pubsub_task_finished(self, task: Task) -> None:
exc = task.exception() or ValueError(
f'{task.get_name()} finished without throwing an exception'
)
try:
exc = task.exception()
except CancelledError as e:
exc = e

# In theory, it's impossible to trigger this branch because pubsub
# tasks should never finish without being cancelled or throwing an exception
if exc is None: # pragma: no cover
exc = ValueError(
f'{task.get_name()} finished without throwing an exception'
)

for queues in self._queues_by_room_id.values():
for q in queues:
# put_nowait will not throw here because we use unbounded queues
Expand Down
12 changes: 12 additions & 0 deletions api/tests/test_room_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from asyncio import CancelledError
from datetime import timedelta
from typing import Callable, List

Expand All @@ -9,6 +10,7 @@

from src.api.api_structures import Request, Action
from src.room_store.common import NoSuchRoomError
from src.room_store.redis_room_store import create_redis_room_store
from src.room_store.room_store import (
RoomStore,
UnexpectedReplacementId,
Expand Down Expand Up @@ -257,3 +259,13 @@ async def test_get_last_activity_time(room_store: RoomStore) -> None:
@any_room_store
async def test_unknown_last_activity(room_store: RoomStore) -> None:
assert await room_store.seconds_since_last_activity() is None


async def test_change_subscription_throws_when_closed(redis: Redis) -> None:
"""Verify that attempting to listen for changes after redis room store has been
closed will throw an exception"""
async with create_redis_room_store(redis) as room_store:
test = await room_store.changes('room-id-1')

with pytest.raises(CancelledError):
await async_collect(test)

0 comments on commit c2e6e73

Please sign in to comment.