Skip to content

Commit

Permalink
Add an endpoint to view basic stats (#865)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjohnson31 committed Dec 7, 2021
1 parent 2162c4f commit 456de60
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 7 deletions.
13 changes: 11 additions & 2 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
Transport,
)
from asyncio import Future, AbstractEventLoop
from functools import partial
from socket import gaierror
from typing import TypedDict, Optional, Any, Dict, cast
from typing import TypedDict, Optional, Any, Dict, cast, Callable, Awaitable
from uuid import uuid4

import aiobotocore
Expand All @@ -24,8 +25,11 @@
from botocore.exceptions import ClientError
from scout_apm.api import Config as ScoutConfig
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response

from src.api.wsmanager import WebsocketManager
from src.api.stats_endpoint import stats_endpoint
from src.compaction import Compactor
from src.config import config, Environment
from src.game_state_server import GameStateServer
Expand All @@ -36,6 +40,7 @@
from src.room_store.redis_room_store import create_redis_room_store
from src.room_store.s3_room_archive import S3RoomArchive
from src.routes import routes
from src.usage_stats import get_usage_stats
from src.util.async_util import end_task
from src.util.lazy_asgi import LazyASGI

Expand Down Expand Up @@ -104,6 +109,10 @@ def exception_handler(_: AbstractEventLoop, context: Dict[str, Any]) -> None:
merged_room_store = MergedRoomStore(redis_room_store, room_archive)
gss = GameStateServer(merged_room_store, rate_limiter, NoopRateLimiter())
ws = WebsocketManager(gss, rate_limiter, config.bypass_rate_limit_key)
stat_getter = partial(get_usage_stats, redis_room_store, rate_limiter)
stats_view: Callable[[Request], Awaitable[Response]] = partial(
stats_endpoint, stat_getter
)

liveness_task = asyncio.create_task(
ws.maintain_liveness(), name='maintain_liveness'
Expand All @@ -125,7 +134,7 @@ async def shutdown() -> None:
await s3_client_context.__aexit__(None, None, None)

return Starlette(
routes=routes(ws),
routes=routes(stats_view, ws),
on_shutdown=[shutdown],
debug=config.environment == Environment.DEV,
)
Expand Down
14 changes: 14 additions & 0 deletions api/src/api/stats_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json
from typing import Callable, Awaitable
from dataclasses import asdict

from starlette.requests import Request
from starlette.responses import Response

from src.usage_stats import UsageStats


async def stats_endpoint(
get_usage_info: Callable[[], Awaitable[UsageStats]], request: Request
) -> Response:
return Response(json.dumps(asdict(await get_usage_info())))
6 changes: 6 additions & 0 deletions api/src/rate_limit/memory_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ async def acquire_new_room(self, user_id: str) -> None:

user.room_creation_times.append(now)
self._storage.users_by_id[user_id] = user

async def get_total_num_connections(self) -> int:
total_connections = 0
for _, connections in self._storage.room_connections_by_server_id.items():
total_connections += len(connections)
return total_connections
6 changes: 6 additions & 0 deletions api/src/rate_limit/rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ async def acquire_new_room(self, user_id: str) -> None:
many rooms recently
"""
...

async def get_total_num_connections(self) -> int:
"""
:return: Total number of active user connections
"""
...
8 changes: 8 additions & 0 deletions api/src/rate_limit/redis_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ async def acquire_new_room(self, user_id: str) -> None:
if recent_room_creation_count > MAX_ROOMS_PER_TEN_MINUTES:
raise TooManyRoomsCreatedException

@instrument
async def get_total_num_connections(self) -> int:
num_connections = 0
async for entry in self._redis.scan_iter(match='user-connections:*'):
async for _, count in self._redis.hscan_iter(entry):
num_connections += int(count)
return num_connections


async def create_redis_rate_limiter(server_id: str, redis: Redis) -> RedisRateLimiter:
server_key = f'api-server:{server_id}'
Expand Down
9 changes: 9 additions & 0 deletions api/src/room_store/memory_room_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,12 @@ async def get_room_idle_seconds(self, room_id: str) -> int:
if room_id not in self.storage.rooms_by_id:
raise NoSuchRoomError
return int(time.time()) - self.storage.last_room_activity_by_id[room_id]

async def seconds_since_last_activity(self) -> Optional[int]:
most_recent_activity = 0
for _, last_activity_time in self.storage.last_room_activity_by_id.items():
if last_activity_time > most_recent_activity:
most_recent_activity = last_activity_time
if most_recent_activity == 0:
return None
return int(time.time() - most_recent_activity)
4 changes: 4 additions & 0 deletions api/src/room_store/merged_room_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
AsyncIterator,
Iterable,
Any,
Optional,
)

from src.api.api_structures import Request, Action
Expand Down Expand Up @@ -84,3 +85,6 @@ async def delete(self, room_id: str, replacer_id: str, replace_token: Any) -> No

async def get_room_idle_seconds(self, room_id: str) -> int:
return await self._room_store.get_room_idle_seconds(room_id)

async def seconds_since_last_activity(self) -> Optional[int]:
return await self._room_store.seconds_since_last_activity()
13 changes: 13 additions & 0 deletions api/src/room_store/redis_room_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Iterable,
Any,
Union,
Optional,
)

from aioredis import Redis, ResponseError
Expand Down Expand Up @@ -246,6 +247,7 @@ async def delete(self, room_id: str, replacer_id: str, replace_token: Any) -> No
else:
raise

@instrument
async def get_room_idle_seconds(self, room_id: str) -> int:
async with self._redis.pipeline() as pipeline:
await pipeline.exists(_room_key(room_id))
Expand All @@ -266,6 +268,17 @@ async def get_room_idle_seconds(self, room_id: str) -> int:
else:
return int(time.time()) - int(last_edited)

@instrument
async def seconds_since_last_activity(self) -> Optional[int]:
most_recent_activity = 0
async for entry in self._redis.scan_iter(match='last-room-activity:*'):
last_activity_time = int(await self._redis.get(entry))
if last_activity_time > most_recent_activity:
most_recent_activity = last_activity_time
if most_recent_activity == 0:
return None
return int(time.time() - most_recent_activity)


@asynccontextmanager
async def create_redis_room_store(redis: Redis) -> AsyncIterator[RedisRoomStore]:
Expand Down
7 changes: 7 additions & 0 deletions api/src/room_store/room_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ async def get_room_idle_seconds(self, room_id: str) -> int:
:return: Number of seconds since the room was read or written to by a user
"""
...

async def seconds_since_last_activity(self) -> Optional[int]:
"""
:return: How many seconds have passed since the last room update,
or None if there is no recorded activity in Redis
"""
...
13 changes: 9 additions & 4 deletions api/src/routes.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
from typing import List, Any
from typing import List, Any, Callable, Awaitable

from starlette.routing import WebSocketRoute
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import WebSocketRoute, Route

from src.api.wsmanager import WebsocketManager
from src.ws.starlette_ws_client import StarletteWebsocketClient


def routes(ws: WebsocketManager) -> List[Any]:
def routes(
stats_endpoint: Callable[[Request], Awaitable[Response]], ws: WebsocketManager
) -> List[Any]:
return [
Route('/stats', stats_endpoint),
WebSocketRoute(
'/{room_id}',
lambda websocket: ws.connection_handler(
StarletteWebsocketClient(websocket)
),
)
),
]
20 changes: 20 additions & 0 deletions api/src/usage_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass
from typing import Optional

from src.rate_limit.rate_limit import RateLimiter
from src.room_store.room_store import RoomStore


@dataclass
class UsageStats:
seconds_since_last_activity: Optional[int]
num_connections: int


async def get_usage_stats(
room_store: RoomStore, rate_limiter: RateLimiter
) -> UsageStats:
return UsageStats(
await room_store.seconds_since_last_activity(),
await rate_limiter.get_total_num_connections(),
)
8 changes: 7 additions & 1 deletion api/tests/api/test_wsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import pytest
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response

from src.api.api_structures import BYPASS_RATE_LIMIT_HEADER
from src.api.ws_close_codes import (
Expand Down Expand Up @@ -53,6 +55,10 @@
TEST_BYPASS_RATE_LIMIT_KEY = '1234'


async def noop_endpoint(request: Request) -> Response:
return Response('')


@pytest.fixture
async def app() -> Starlette:
room_store = MemoryRoomStore(MemoryRoomStorage())
Expand All @@ -62,7 +68,7 @@ async def app() -> Starlette:
)
gss = GameStateServer(room_store, rate_limiter, NoopRateLimiter())
ws = WebsocketManager(gss, rate_limiter, TEST_BYPASS_RATE_LIMIT_KEY)
return Starlette(routes=routes(ws), debug=True)
return Starlette(routes=routes(noop_endpoint, ws), debug=True)


pytestmark = pytest.mark.asyncio
Expand Down
23 changes: 23 additions & 0 deletions api/tests/test_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,26 @@ async def test_release_connection_on_exception(rate_limiter: RateLimiter) -> Non

# Should be allowed because the connection should be freed after the exception
await rate_limiter.acquire_connection('user-last', 'room-1')


@any_rate_limiter
@time_machine.travel('1970-01-01', tick=False)
async def test_get_num_connections(rate_limiter: RateLimiter) -> None:
assert await rate_limiter.get_total_num_connections() == 0
for i in range(0, MAX_CONNECTIONS_PER_USER):
await rate_limiter.acquire_connection('user-1', 'room-1')
assert await rate_limiter.get_total_num_connections() == MAX_CONNECTIONS_PER_USER
for i in range(0, MAX_CONNECTIONS_PER_USER):
await rate_limiter.acquire_connection('user-2', 'room-2')
assert (
await rate_limiter.get_total_num_connections() == 2 * MAX_CONNECTIONS_PER_USER
)


@any_rate_limiter
@time_machine.travel('1970-01-01', tick=False)
async def test_no_connections(rate_limiter: RateLimiter) -> None:
assert await rate_limiter.get_total_num_connections() == 0
await rate_limiter.acquire_connection('user-1', 'room-1')
await rate_limiter.release_connection('user-1', 'room-1')
assert await rate_limiter.get_total_num_connections() == 0
15 changes: 15 additions & 0 deletions api/tests/test_room_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,18 @@ async def test_write_if_missing_does_not_overwrite(room_store: RoomStore) -> Non
await room_store.add_request(TEST_ROOM_ID, VALID_REQUEST)
await room_store.write_if_missing(TEST_ROOM_ID, [ANOTHER_VALID_ACTION])
assert list(await room_store.read(TEST_ROOM_ID)) == [VALID_ACTION]


@any_room_store
async def test_get_last_activity_time(room_store: RoomStore) -> None:
with time_machine.travel('1970-01-01', tick=False) as traveller:
await room_store.add_request(TEST_ROOM_ID, VALID_REQUEST)
traveller.shift(timedelta(seconds=100))
await room_store.add_request(TEST_ROOM_ID, VALID_MOVE_REQUEST)
traveller.shift(timedelta(seconds=500))
assert await room_store.seconds_since_last_activity() == 500


@any_room_store
async def test_unknown_last_activity(room_store: RoomStore) -> None:
assert await room_store.seconds_since_last_activity() is None

0 comments on commit 456de60

Please sign in to comment.