Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Client Doesn't Handle Internet Disconnection #555

Open
Amindv1 opened this issue Apr 11, 2023 · 4 comments
Open

Async Client Doesn't Handle Internet Disconnection #555

Amindv1 opened this issue Apr 11, 2023 · 4 comments

Comments

@Amindv1
Copy link

Amindv1 commented Apr 11, 2023

Description

It seems like the xrpl.py async client doesn't handle wifi disconnections:

    def is_open(self: WebsocketBase) -> bool:
        """
        Returns whether the client is currently open.

        Returns:
            True if the client is currently open, False otherwise.
        """
        return (
            self._handler_task is not None
            and self._messages is not None
            and self._websocket is not None
            and self._websocket.open
        )

This is the logic it uses to see if a connection is open. Please correct me if im wrong: At first glance it looks correct since it checks self._websocket.open but since we're waiting for data from our peer, the TCP network can't tell the difference between a healthy and a broken connection when the peer isn't sending us any data. So this field never updates and the websocket doesn't close - and in turn I can't attempt to reconnect.

Oddly enough even after reconnecting the wifi the websockets don't reconnect (They only reconnect if I disconnect and reconnect the wifi quickly ~2s).

Reproduction

You can just paste the following code in a python file and run it (you need the imports). After youre seeing logs disconnect the wifi and wait about a minute, then reconnect.

If you don't wait around a minute it'll reconnect on its own, but if you wait for longer it doesnt handle reconnection.

from aiohttp import web, web_app
import logging

from websockets.exceptions import ConnectionClosedError
from xrpl.asyncio.clients import (
    AsyncWebsocketClient,
)
from xrpl.models import StreamParameter, Subscribe

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


async def start(app: web_app.Application):
    stream_type = StreamParameter.LEDGER
    ledger_update_sub_req = Subscribe(
        streams = [stream_type]
    )

    async with AsyncWebsocketClient("wss://s1.ripple.com") as client:
        # one time subscription
        logger.info("[LedgerCreationDataSource] Sending subscribe request")
        await client.send(ledger_update_sub_req)

        try:
            async for message in client:
                logger.info("[LedgerCreationDataSource] received message: " + str(message))
        except ConnectionClosedError:
            logger.error("[LedgerCreationDataSource] Connection closed - connection closed error")

    logger.warning("[LedgerCreationDataSource] Connection closed - server kicked us off")

if __name__ == "__main__":
    app = web.Application()
    app.on_startup.append(start)

    web.run_app(app)

@ledhed2222
Copy link
Collaborator

ledhed2222 commented Apr 12, 2023

i'm not sure this is the right solution, but one idea would be to start a second asyncio.Task when the client opens that periodically sends a ping to the server and, if a pong is not received, closes the client.

https://websockets.readthedocs.io/en/stable/reference/sansio/client.html#websockets.client.ClientProtocol.send_ping

it would be something like this:

class WebsocketBase():
  def __init__(self, timeout):
    ....
    self.timeout = timeout

  async def _ping_sender(self):
    while self.is_open():
      async with asyncio.TaskGroup() as tg:
        x = tg.create_task(self._do_request_impl(<<<SEND PING>>>))
        y = tg.create_task(asyncio.sleep(self.timeout))
      # now either both those tasks completed or one errored
      if x.exception() is not None or y.exception() is not None:
        await self._do_close()
      else:
        await asyncio.sleep(<<some amount of time between pings>>)

then this task needs to be started/stopped with the client

@justinr1234
Copy link
Collaborator

@ledhed2222 Thanks for the insight! Would you feel comfortable implementing this?

@Amindv1
Copy link
Author

Amindv1 commented Apr 12, 2023

Based off my research what @ledhed2222 mentioned is the correct and most widely accepted solution. Right now my workaround is to add a wrapper to the iterator which takes a timeout. Then I reconnect the websocket if we hit that timeout.

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
                    # if you want to stop the iteration just raise StopAsyncIteration using some conditions (when the
                    # last chunk arrives, for example)
                    if not result:
                        raise StopAsyncIteration
                    return result
                except asyncio.TimeoutError as e:
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

which would be used like so:

            try:
                timed_message_iterator = AsyncTimedIterable(client, 15)
                async for message in timed_message_iterator:
                    self.async_queue.put_nowait(message)
            except asyncio.TimeoutError as e:
                logger.error("[LedgerCreationDataSource] Connection closed: " + str(e))
                await client.close()

It would be nice to have this functionality within the client as well.

resource: https://medium.com/@dmitry8912/implementing-timeouts-in-pythons-asynchronous-generators-f7cbaa6dc1e9

@ledhed2222
Copy link
Collaborator

ledhed2222 commented Apr 14, 2023

nah @justinr1234 - even though i wrote this i'm working on other stuff now. plus i want to pass the baton on this code to someone else. what i'd recommend is two different features:

  1. some feature within AsyncWebsocketClient that allows users to specify a timeout for waiting for the next message via async for iteration. I would personally prefer this to exposing a public method that just gets the next method, since I think we should encourage async iteration
  2. someone needs to do testing around what happens when the server drops a connection. our client should signal that that has happened, somehow. i don't think it is. it could raise an error, or do something else.

I also drafted this PR that I hope will better explain how to use the current implementation. I'm hoping it will stop people from asking about hooks :) #545

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants