Skip to content

Commit

Permalink
Run periodic tasks in app context
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed May 21, 2024
1 parent c28afdd commit 0acef04
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions aleph/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(
self.indexing_batches = defaultdict(list)
self.local_queue = queue.Queue()
self.prefetch_count = prefetch_count
self.periodic_timer = threading.Timer(10, self.periodic)

def on_message(self, channel, method, properties, body, args):
connection = args[0]
Expand All @@ -127,34 +128,44 @@ def on_message(self, channel, method, properties, body, args):
task._channel = channel
self.local_queue.put((task, channel, connection))

def on_signal(self, signal, _):
log.debug("Cancelling periodic timer")
self.periodic_timer.cancel()
super().on_signal(signal, _)

def process(self, blocking=True):
if blocking:
log.info(
f"Starting periodic timer (interval={self.periodic_timer.interval}s)"
)
self.periodic_timer.start()
with app.app_context():
self.process_blocking()
else:
self.process_nonblocking()

def periodic(self):
try:
db.session.remove()
if self.often.check():
self.often.update()
log.info("Self-check...")
compute_collections()
Dataset.cleanup_dataset_status(kv)

if self.daily.check():
self.daily.update()
log.info("Running daily tasks...")
update_roles()
check_alerts()
generate_digest()
delete_expired_exports()
delete_old_notifications()

self.run_indexing_batches()
except Exception:
log.exception("Error while executing periodic tasks")
with app.app_context():
try:
db.session.remove()
if self.often.check():
self.often.update()
log.info("Self-check...")
compute_collections()
Dataset.cleanup_dataset_status(kv)

if self.daily.check():
self.daily.update()
log.info("Running daily tasks...")
update_roles()
check_alerts()
generate_digest()
delete_expired_exports()
delete_old_notifications()

self.run_indexing_batches()
except Exception:
log.exception("Error while executing periodic tasks")

def dispatch_task(self, task: Task) -> Task:
log.info(
Expand Down

0 comments on commit 0acef04

Please sign in to comment.