Skip to content

Commit

Permalink
Merge StreamListener into Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmon758 committed Jan 24, 2021
1 parent a5af600 commit 39abff4
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 104 deletions.
2 changes: 1 addition & 1 deletion tweepy/__init__.py
Expand Up @@ -15,7 +15,7 @@
from tweepy.cursor import Cursor
from tweepy.error import RateLimitError, TweepError
from tweepy.models import DirectMessage, Friendship, ModelFactory, SavedSearch, SearchResults, Status, User
from tweepy.streaming import Stream, StreamListener
from tweepy.streaming import Stream

# Global, unauthenticated instance of API
api = API()
Expand Down
201 changes: 98 additions & 103 deletions tweepy/streaming.py
Expand Up @@ -21,110 +21,15 @@
log = logging.getLogger(__name__)


class StreamListener:

def on_connect(self):
"""Called once connected to streaming server.
This will be invoked once a successful response
is received from the server. Allows the listener
to perform some work prior to entering the read loop.
"""
log.info("Stream connected")

def on_connection_error(self):
"""Called when stream connection errors or times out"""
log.error("Stream connection has errored or timed out")

def on_exception(self, exception):
"""Called when an unhandled exception occurs."""
log.exception("Stream encountered an exception")

def on_keep_alive(self):
"""Called when a keep-alive arrived"""
log.debug("Received keep-alive signal")

def on_request_error(self, status_code):
"""Called when a non-200 status code is returned"""
log.error("Stream encountered HTTP error: %d", status_code)
return False

def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)

if 'in_reply_to_status_id' in data:
status = Status.parse(None, data)
return self.on_status(status)
if 'delete' in data:
delete = data['delete']['status']
return self.on_delete(delete['id'], delete['user_id'])
if 'disconnect' in data:
return self.on_disconnect(data['disconnect'])
if 'limit' in data:
return self.on_limit(data['limit']['track'])
if 'scrub_geo' in data:
return self.on_scrub_geo(data['scrub_geo'])
if 'status_withheld' in data:
return self.on_status_withheld(data['status_withheld'])
if 'user_withheld' in data:
return self.on_user_withheld(data['user_withheld'])
if 'warning' in data:
return self.on_warning(data['warning'])

log.error("Unknown message type: %s", raw_data)

def on_status(self, status):
"""Called when a new status arrives"""
log.debug("Received status: %d", status.id)

def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
log.debug("Received status deletion notice: %d", status_id)

def on_disconnect(self, notice):
"""Called when twitter sends a disconnect notice
Disconnect codes are listed here:
https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
"""
log.warning("Received disconnect message: %s", notice)

def on_limit(self, track):
"""Called when a limitation notice arrives"""
log.debug("Received limit notice: %d", track)

def on_scrub_geo(self, notice):
"""Called when a location deletion notice arrives"""
log.debug("Received location deletion notice: %s", notice)

def on_status_withheld(self, notice):
"""Called when a status withheld content notice arrives"""
log.debug("Received status withheld content notice: %s", notice)

def on_user_withheld(self, notice):
"""Called when a user withheld content notice arrives"""
log.debug("Received user withheld content notice: %s", notice)

def on_warning(self, notice):
"""Called when a disconnection warning message arrives"""
log.warning("Received stall warning: %s", notice)


class Stream:

def __init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, listener, *, chunk_size=512,
daemon=False, max_retries=inf, proxy=None, verify=True):
access_token_secret, *, chunk_size=512, daemon=False,
max_retries=inf, proxy=None, verify=True):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.listener = listener
# The default socket.read size. Default to less than half the size of
# a tweet so that it reads tweets with the minimal latency of 2 reads
# per tweet. Values higher than ~1kb will increase latency by waiting
Expand Down Expand Up @@ -169,7 +74,7 @@ def _connect(self, endpoint, params=None, body=None):
verify=self.verify, proxies=self.proxies
) as resp:
if resp.status_code != 200:
if self.listener.on_request_error(resp.status_code) is False:
if self.on_request_error(resp.status_code) is False:
break
error_count += 1
if resp.status_code == 420:
Expand All @@ -183,16 +88,16 @@ def _connect(self, endpoint, params=None, body=None):
error_count = 0
http_error_wait = http_error_wait_start
network_error_wait = network_error_wait_step
self.listener.on_connect()
self.on_connect()

for line in resp.iter_lines(
chunk_size=self.chunk_size
):
if not self.running:
break
if not line:
self.listener.on_keep_alive()
elif self.listener.on_data(line) is False:
self.on_keep_alive()
elif self.on_data(line) is False:
self.running = False
break

Expand All @@ -207,7 +112,7 @@ def _connect(self, endpoint, params=None, body=None):
if isinstance(exc, ssl.SSLError):
if not (exc.args and 'timed out' in str(exc.args[0])):
raise
if self.listener.on_connection_error() is False:
if self.on_connection_error() is False:
break
if self.running is False:
break
Expand All @@ -217,7 +122,7 @@ def _connect(self, endpoint, params=None, body=None):
network_error_wait_max
)
except Exception as exc:
self.listener.on_exception(exc)
self.on_exception(exc)
raise
finally:
self.session.close()
Expand Down Expand Up @@ -282,3 +187,93 @@ def disconnect(self):
def on_closed(self, resp):
""" Called when the response has been closed by Twitter """
pass

def on_connect(self):
"""Called once connected to streaming server.
This will be invoked once a successful response
is received from the server.
"""
log.info("Stream connected")

def on_connection_error(self):
"""Called when stream connection errors or times out"""
log.error("Stream connection has errored or timed out")

def on_exception(self, exception):
"""Called when an unhandled exception occurs."""
log.exception("Stream encountered an exception")

def on_keep_alive(self):
"""Called when a keep-alive arrived"""
log.debug("Received keep-alive signal")

def on_request_error(self, status_code):
"""Called when a non-200 status code is returned"""
log.error("Stream encountered HTTP error: %d", status_code)
return False

def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)

if 'in_reply_to_status_id' in data:
status = Status.parse(None, data)
return self.on_status(status)
if 'delete' in data:
delete = data['delete']['status']
return self.on_delete(delete['id'], delete['user_id'])
if 'disconnect' in data:
return self.on_disconnect(data['disconnect'])
if 'limit' in data:
return self.on_limit(data['limit']['track'])
if 'scrub_geo' in data:
return self.on_scrub_geo(data['scrub_geo'])
if 'status_withheld' in data:
return self.on_status_withheld(data['status_withheld'])
if 'user_withheld' in data:
return self.on_user_withheld(data['user_withheld'])
if 'warning' in data:
return self.on_warning(data['warning'])

log.error("Unknown message type: %s", raw_data)

def on_status(self, status):
"""Called when a new status arrives"""
log.debug("Received status: %d", status.id)

def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
log.debug("Received status deletion notice: %d", status_id)

def on_disconnect(self, notice):
"""Called when twitter sends a disconnect notice
Disconnect codes are listed here:
https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
"""
log.warning("Received disconnect message: %s", notice)

def on_limit(self, track):
"""Called when a limitation notice arrives"""
log.debug("Received limit notice: %d", track)

def on_scrub_geo(self, notice):
"""Called when a location deletion notice arrives"""
log.debug("Received location deletion notice: %s", notice)

def on_status_withheld(self, notice):
"""Called when a status withheld content notice arrives"""
log.debug("Received status withheld content notice: %s", notice)

def on_user_withheld(self, notice):
"""Called when a user withheld content notice arrives"""
log.debug("Received user withheld content notice: %s", notice)

def on_warning(self, notice):
"""Called when a disconnection warning message arrives"""
log.warning("Received stall warning: %s", notice)

1 comment on commit 39abff4

@hxr10601
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very useful. I met the same problem with you and under your guidance, I successfully dealt with it. Thank you so much.

Please sign in to comment.