Skip to content

Commit

Permalink
Add post-maintenance reconnect helper for GMO Coin
Browse files Browse the repository at this point in the history
  • Loading branch information
MtkN1 committed May 12, 2024
1 parent ed28570 commit c6df6da
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pybotters/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from __future__ import annotations

from .gmocoin import GMOCoinHelper, GMOCoinResponseError

__all__: tuple[str, ...] = (
"GMOCoinHelper",
"GMOCoinResponseError",
)
74 changes: 74 additions & 0 deletions pybotters/helpers/gmocoin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import asyncio
import logging
from typing import NoReturn

from ..client import Client
from ..ws import WebSocketApp

logger = logging.getLogger(__name__)


class GMOCoinHelper:
def __init__(
self,
client: Client,
) -> None:
self._client = client
self._url = removeprefix(
"https://api.coin.z.com/private/v1/ws-auth", self._client._base_url
)

async def create_access_token(self) -> str:
r = await self._client.fetch(
"POST",
self._url,
)

if isinstance(r.data, dict) and r.data.get("status") == 0:
token = r.data["data"]
return token
else:
raise GMOCoinResponseError(r.text)

async def extend_access_token(self, token: str) -> None:
r = await self._client.fetch(
"PUT",
self._url,
data={"token": token},
)

if isinstance(r.data, dict) and r.data.get("status") == 0:
return
else:
raise GMOCoinResponseError(r.text)

async def manage_ws_token(
self,
ws: WebSocketApp,
token: str,
delay: float = 1800.0, # 30 minutes
) -> NoReturn:
while True:
try:
await self.extend_access_token(token)
except GMOCoinResponseError as e1:
try:
token = await self.create_access_token()
except GMOCoinResponseError as e2:
logger.debug(
f"GMO Coin access token could not be extended or created: {e1} {e2}"
)
else:
ws.url = f"wss://api.coin.z.com/ws/private/v1/{token}"
logger.debug("GMO Coin Access Token has been created")
else:
logger.debug("GMO Coin Access Token has been extended")

await asyncio.sleep(delay)


class GMOCoinResponseError(Exception): ...


def removeprefix(self: str, prefix: str, /) -> str:
return self[len(prefix) :]
165 changes: 165 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import asyncio
from contextlib import suppress
from typing import Awaitable, Callable, Type
from unittest.mock import AsyncMock

import aiohttp
import pytest
import pytest_asyncio
from aiohttp import web
from aiohttp.test_utils import TestServer
from aiohttp.typedefs import StrOrURL
from yarl import URL

import pybotters
from pybotters.helpers.gmocoin import GMOCoinHelper
from pybotters.request import ClientRequest


@pytest_asyncio.fixture
async def pybotters_client(
aiohttp_client_cls: Type[aiohttp.ClientSession],
monkeypatch: pytest.MonkeyPatch,
):
# From https://github.com/aio-libs/pytest-aiohttp/blob/v1.0.4/pytest_aiohttp/plugin.py#L139

clients = []

async def go(app, **kwargs):
server = TestServer(app)
aiohttp_client = aiohttp_client_cls(server)
aiohttp_client._session._request_class = ClientRequest

await aiohttp_client.start_server()
clients.append(aiohttp_client)

def dummy_request(method: str, str_or_url: StrOrURL, **kwargs):
return aiohttp_client._request(method, URL(str_or_url).path_qs, **kwargs)

_pybotters_client = pybotters.Client(**kwargs)
monkeypatch.setattr(
_pybotters_client._session,
_pybotters_client._session._request.__name__,
dummy_request,
)
return _pybotters_client

yield go

while clients:
await clients.pop().close()


@pytest.fixture
def sleep_cancel(monkeypatch):
async def sleep_canceller(delay):
raise asyncio.CancelledError()

monkeypatch.setattr(asyncio, asyncio.sleep.__name__, sleep_canceller)


async def create_access_token(request: web.Request):
return web.json_response(
{
"status": 0,
"data": "xxxxxxxxxxxxxxxxxxxx",
"responsetime": "2019-03-19T02:15:06.102Z",
}
)


async def extend_access_token(request: web.Request):
return web.json_response(
{
"status": 0,
"responsetime": "2019-03-19T02:15:06.102Z",
}
)


async def create_access_token_error(request: web.Request):
return web.json_response(
{
"status": 5,
"messages": [
{
"message_code": "ERR-5201",
"message_string": "MAINTENANCE. Please wait for a while",
}
],
}
)


async def extend_access_token_error(request: web.Request):
return web.json_response(
{
"status": 1,
"messages": [
{
"message_code": "ERR-5106",
"message_string": "Invalid request parameter.",
}
],
"responsetime": "2024-05-11T02:02:40.501Z",
}
)


@pytest.mark.asyncio
@pytest.mark.parametrize(
"base_url, extend_access_handler, create_access_handler, expected_token",
[
(
"",
extend_access_token,
create_access_token,
"aaaaaaaaaa",
),
(
"https://api.coin.z.com",
extend_access_token,
create_access_token,
"aaaaaaaaaa",
),
(
"https://api.coin.z.com",
extend_access_token_error,
create_access_token,
"xxxxxxxxxxxxxxxxxxxx",
),
(
"https://api.coin.z.com",
extend_access_token_error,
create_access_token_error,
"aaaaaaaaaa",
),
],
)
async def test_gmo_manage_ws_token(
pybotters_client: Callable[..., Awaitable[pybotters.Client]],
base_url: str,
extend_access_handler: Callable[..., Awaitable[web.Response]],
create_access_handler: Callable[..., Awaitable[web.Response]],
expected_token: str,
sleep_cancel: None,
):
app = web.Application()
app.router.add_put("/private/v1/ws-auth", extend_access_handler)
app.router.add_post("/private/v1/ws-auth", create_access_handler)
client = await pybotters_client(app, base_url=base_url)

token = "aaaaaaaaaa"
url = f"wss://api.coin.z.com/ws/private/v1/{token}"
m_ws = AsyncMock()
m_ws.url = url

helper = GMOCoinHelper(client)
with suppress(asyncio.CancelledError):
await helper.manage_ws_token(
m_ws,
token,
300.0,
)

assert m_ws.url == f"wss://api.coin.z.com/ws/private/v1/{expected_token}"

0 comments on commit c6df6da

Please sign in to comment.