From 39abff4520e291180425ac2219d1d8597ac5da96 Mon Sep 17 00:00:00 2001 From: Harmon Date: Sun, 24 Jan 2021 07:43:50 -0600 Subject: [PATCH] Merge StreamListener into Stream --- tweepy/__init__.py | 2 +- tweepy/streaming.py | 201 +++++++++++++++++++++----------------------- 2 files changed, 99 insertions(+), 104 deletions(-) diff --git a/tweepy/__init__.py b/tweepy/__init__.py index b7d0a485f..3f94ac464 100644 --- a/tweepy/__init__.py +++ b/tweepy/__init__.py @@ -15,7 +15,7 @@ from tweepy.cursor import Cursor from tweepy.error import RateLimitError, TweepError from tweepy.models import DirectMessage, Friendship, ModelFactory, SavedSearch, SearchResults, Status, User -from tweepy.streaming import Stream, StreamListener +from tweepy.streaming import Stream # Global, unauthenticated instance of API api = API() diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 48dd2c8a1..357300196 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -21,110 +21,15 @@ log = logging.getLogger(__name__) -class StreamListener: - - def on_connect(self): - """Called once connected to streaming server. - - This will be invoked once a successful response - is received from the server. Allows the listener - to perform some work prior to entering the read loop. - """ - log.info("Stream connected") - - def on_connection_error(self): - """Called when stream connection errors or times out""" - log.error("Stream connection has errored or timed out") - - def on_exception(self, exception): - """Called when an unhandled exception occurs.""" - log.exception("Stream encountered an exception") - - def on_keep_alive(self): - """Called when a keep-alive arrived""" - log.debug("Received keep-alive signal") - - def on_request_error(self, status_code): - """Called when a non-200 status code is returned""" - log.error("Stream encountered HTTP error: %d", status_code) - return False - - def on_data(self, raw_data): - """Called when raw data is received from connection. - - Override this method if you wish to manually handle - the stream data. Return False to stop stream and close connection. - """ - data = json.loads(raw_data) - - if 'in_reply_to_status_id' in data: - status = Status.parse(None, data) - return self.on_status(status) - if 'delete' in data: - delete = data['delete']['status'] - return self.on_delete(delete['id'], delete['user_id']) - if 'disconnect' in data: - return self.on_disconnect(data['disconnect']) - if 'limit' in data: - return self.on_limit(data['limit']['track']) - if 'scrub_geo' in data: - return self.on_scrub_geo(data['scrub_geo']) - if 'status_withheld' in data: - return self.on_status_withheld(data['status_withheld']) - if 'user_withheld' in data: - return self.on_user_withheld(data['user_withheld']) - if 'warning' in data: - return self.on_warning(data['warning']) - - log.error("Unknown message type: %s", raw_data) - - def on_status(self, status): - """Called when a new status arrives""" - log.debug("Received status: %d", status.id) - - def on_delete(self, status_id, user_id): - """Called when a delete notice arrives for a status""" - log.debug("Received status deletion notice: %d", status_id) - - def on_disconnect(self, notice): - """Called when twitter sends a disconnect notice - - Disconnect codes are listed here: - https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types - """ - log.warning("Received disconnect message: %s", notice) - - def on_limit(self, track): - """Called when a limitation notice arrives""" - log.debug("Received limit notice: %d", track) - - def on_scrub_geo(self, notice): - """Called when a location deletion notice arrives""" - log.debug("Received location deletion notice: %s", notice) - - def on_status_withheld(self, notice): - """Called when a status withheld content notice arrives""" - log.debug("Received status withheld content notice: %s", notice) - - def on_user_withheld(self, notice): - """Called when a user withheld content notice arrives""" - log.debug("Received user withheld content notice: %s", notice) - - def on_warning(self, notice): - """Called when a disconnection warning message arrives""" - log.warning("Received stall warning: %s", notice) - - class Stream: def __init__(self, consumer_key, consumer_secret, access_token, - access_token_secret, listener, *, chunk_size=512, - daemon=False, max_retries=inf, proxy=None, verify=True): + access_token_secret, *, chunk_size=512, daemon=False, + max_retries=inf, proxy=None, verify=True): self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.access_token = access_token self.access_token_secret = access_token_secret - self.listener = listener # The default socket.read size. Default to less than half the size of # a tweet so that it reads tweets with the minimal latency of 2 reads # per tweet. Values higher than ~1kb will increase latency by waiting @@ -169,7 +74,7 @@ def _connect(self, endpoint, params=None, body=None): verify=self.verify, proxies=self.proxies ) as resp: if resp.status_code != 200: - if self.listener.on_request_error(resp.status_code) is False: + if self.on_request_error(resp.status_code) is False: break error_count += 1 if resp.status_code == 420: @@ -183,7 +88,7 @@ def _connect(self, endpoint, params=None, body=None): error_count = 0 http_error_wait = http_error_wait_start network_error_wait = network_error_wait_step - self.listener.on_connect() + self.on_connect() for line in resp.iter_lines( chunk_size=self.chunk_size @@ -191,8 +96,8 @@ def _connect(self, endpoint, params=None, body=None): if not self.running: break if not line: - self.listener.on_keep_alive() - elif self.listener.on_data(line) is False: + self.on_keep_alive() + elif self.on_data(line) is False: self.running = False break @@ -207,7 +112,7 @@ def _connect(self, endpoint, params=None, body=None): if isinstance(exc, ssl.SSLError): if not (exc.args and 'timed out' in str(exc.args[0])): raise - if self.listener.on_connection_error() is False: + if self.on_connection_error() is False: break if self.running is False: break @@ -217,7 +122,7 @@ def _connect(self, endpoint, params=None, body=None): network_error_wait_max ) except Exception as exc: - self.listener.on_exception(exc) + self.on_exception(exc) raise finally: self.session.close() @@ -282,3 +187,93 @@ def disconnect(self): def on_closed(self, resp): """ Called when the response has been closed by Twitter """ pass + + def on_connect(self): + """Called once connected to streaming server. + + This will be invoked once a successful response + is received from the server. + """ + log.info("Stream connected") + + def on_connection_error(self): + """Called when stream connection errors or times out""" + log.error("Stream connection has errored or timed out") + + def on_exception(self, exception): + """Called when an unhandled exception occurs.""" + log.exception("Stream encountered an exception") + + def on_keep_alive(self): + """Called when a keep-alive arrived""" + log.debug("Received keep-alive signal") + + def on_request_error(self, status_code): + """Called when a non-200 status code is returned""" + log.error("Stream encountered HTTP error: %d", status_code) + return False + + def on_data(self, raw_data): + """Called when raw data is received from connection. + + Override this method if you wish to manually handle + the stream data. Return False to stop stream and close connection. + """ + data = json.loads(raw_data) + + if 'in_reply_to_status_id' in data: + status = Status.parse(None, data) + return self.on_status(status) + if 'delete' in data: + delete = data['delete']['status'] + return self.on_delete(delete['id'], delete['user_id']) + if 'disconnect' in data: + return self.on_disconnect(data['disconnect']) + if 'limit' in data: + return self.on_limit(data['limit']['track']) + if 'scrub_geo' in data: + return self.on_scrub_geo(data['scrub_geo']) + if 'status_withheld' in data: + return self.on_status_withheld(data['status_withheld']) + if 'user_withheld' in data: + return self.on_user_withheld(data['user_withheld']) + if 'warning' in data: + return self.on_warning(data['warning']) + + log.error("Unknown message type: %s", raw_data) + + def on_status(self, status): + """Called when a new status arrives""" + log.debug("Received status: %d", status.id) + + def on_delete(self, status_id, user_id): + """Called when a delete notice arrives for a status""" + log.debug("Received status deletion notice: %d", status_id) + + def on_disconnect(self, notice): + """Called when twitter sends a disconnect notice + + Disconnect codes are listed here: + https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types + """ + log.warning("Received disconnect message: %s", notice) + + def on_limit(self, track): + """Called when a limitation notice arrives""" + log.debug("Received limit notice: %d", track) + + def on_scrub_geo(self, notice): + """Called when a location deletion notice arrives""" + log.debug("Received location deletion notice: %s", notice) + + def on_status_withheld(self, notice): + """Called when a status withheld content notice arrives""" + log.debug("Received status withheld content notice: %s", notice) + + def on_user_withheld(self, notice): + """Called when a user withheld content notice arrives""" + log.debug("Received user withheld content notice: %s", notice) + + def on_warning(self, notice): + """Called when a disconnection warning message arrives""" + log.warning("Received stall warning: %s", notice)