Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a local runner for better testing and local development #992

Open
zschumacher opened this issue Mar 5, 2024 · 0 comments
Open

Add a local runner for better testing and local development #992

zschumacher opened this issue Mar 5, 2024 · 0 comments

Comments

@zschumacher
Copy link

zschumacher commented Mar 5, 2024

Is your feature request related to a problem? Please describe.
It would be nice to test your entire workflow without having a local kubernetes cluster and argo running locally. If I think of a tool like prefect, you can simply call the flow outside of the context and it will execute with a seperate, local runner in your interpreter. Even if this only supports a subset of features in the lib (like script tasks in a dag), that would be super useful.

Describe the solution you'd like
It would be great to be able to just do something like

with Workflow(), DAG() as dag:
    ...

if __name__ == "__main__":
    dag() # runs in local interpreter

Describe alternatives you've considered
I wrote my own local runner with a lot of custom code. Here is an example below, although I still need an elegant solution for paraemters/arguments.

class Dag(DAG):

    def __call__(self, *args, **kwargs):
        if _context.active:
            return super().__call__(*args, **kwargs)
        executor = DagExecutor(self)
        asyncio.run(executor.run())

class DagExecutor:

    def __init__(self, dag: DAG, **kwargs):
        self.dag = dag
        self.done = set()
        super().__init__(**kwargs)

    async def _fill_queue(self, queue: asyncio.Queue):
        for task in self.dag.tasks:
            await queue.put(task)

    async def _run_task(self, task: Task):
        args = {k: v for arg in task.arguments for k, v in arg.items()} if task.arguments else {}
        if inspect.iscoroutinefunction(task.template.source):
            await task.template.source(**args)
        else:
            f = functools.partial(task.template.source, **args)
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, f)

    async def _worker(self, queue: asyncio.Queue):
        task = await queue.get()
        if not task.depends:
            await self._run_task(task)
        elif task.depends:
            dependencies = task.depends.split(" && ")
            for dep in dependencies:
                while dep not in self.done:
                    await asyncio.sleep(0)
            await self._run_task(task)
        self.done.add(task.name)

    async def run(self):
        queue = asyncio.Queue()
        fill_queue_task = self._fill_queue(queue)
        workers = [self._worker(queue) for _ in range(len(self.dag.tasks))]
        await fill_queue_task
        await asyncio.gather(*workers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant