Skip to content

Commit

Permalink
Merge pull request #139 from kbakk/138-watcher-consecutive-error
Browse files Browse the repository at this point in the history
Change watcher to check for consecutive errors per thread
  • Loading branch information
JonatanMartens committed Mar 15, 2021
2 parents 93ddb67 + 6fc1de8 commit f7d5f64
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
author = 'Jonatan Martens'

# The full version, including alpha/beta/rc tags
release = '2.3.0'
release = '2.3.1'

# -- General configuration ---------------------------------------------------

Expand Down Expand Up @@ -59,6 +59,6 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

version = "2.3.0"
version = "2.3.1"

master_doc = 'index'
2 changes: 1 addition & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.3.0"
__version__ = "2.3.1"

from pyzeebe import exceptions
from pyzeebe.client.client import ZeebeClient
Expand Down
18 changes: 10 additions & 8 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N
before (List[TaskDecorator]): Decorators to be performed before each task
after (List[TaskDecorator]): Decorators to be performed after each task
max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
watcher_max_errors_factor (int): Number of consequtive errors for a task watcher will accept before raising MaxConsecutiveTaskThreadError
"""
super().__init__(before, after)
self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, credentials=credentials,
Expand Down Expand Up @@ -120,20 +121,21 @@ def _should_watch_threads(self) -> bool:
return not self.stop_event.is_set() and bool(self._task_threads)

def _watch_task_threads_runner(self, frequency: int = 10) -> None:
consecutive_errors = 0
consecutive_errors = {}
while self._should_watch_threads():
logger.debug("Checking task thread status")
# converting to list to avoid "RuntimeError: dictionary changed size during iteration"
for task_type in list(self._task_threads.keys()):
consecutive_errors.setdefault(task_type, 0)
# thread might be none, if dict changed size, in that case we'll consider it
# an error, and check if we should handle it
thread = self._task_threads.get(task_type)
if not thread or not thread.is_alive():
consecutive_errors += 1
self._check_max_errors(consecutive_errors)
consecutive_errors[task_type] += 1
self._check_max_errors(consecutive_errors[task_type], task_type)
self._handle_not_alive_thread(task_type)
else:
consecutive_errors = 0
consecutive_errors[task_type] = 0
time.sleep(frequency)

def _handle_not_alive_thread(self, task_type: str):
Expand All @@ -143,11 +145,11 @@ def _handle_not_alive_thread(self, task_type: str):
else:
logger.warning(f"Task thread {task_type} is not alive, but condition not met for restarting")

def _check_max_errors(self, consecutive_errors: int):
max_errors = self.watcher_max_errors_factor * len(self.tasks)
if consecutive_errors >= max_errors:
def _check_max_errors(self, consecutive_errors: int, task_type: str):
if consecutive_errors >= self.watcher_max_errors_factor:
raise MaxConsecutiveTaskThreadError(f"Number of consecutive errors ({consecutive_errors}) exceeded "
f"max allowed number of errors ({max_errors})")
f"max allowed number of errors ({self.watcher_max_errors_factor}) "
f" for task {task_type}", task_type)

def _restart_task_thread(self, task_type: str) -> None:
task = self.get_task(task_type)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyzeebe",
version="2.3.0",
version="2.3.1",
author="Jonatan Martens",
author_email="jonatanmartenstav@gmail.com",
description="Zeebe client api",
Expand Down

0 comments on commit f7d5f64

Please sign in to comment.