Hera is a Python framework for constructing and submitting Argo Workflows.

Overview

Hera (hera-workflows)

The Argo was constructed by the shipwright Argus, and its crew were specially protected by the goddess Hera.

(https://en.wikipedia.org/wiki/Argo)

License: MIT

Hera is a Python framework for constructing and submitting Argo Workflows. The main goal of Hera is to make Argo Workflows more accessible by abstracting away some setup that is typically necessary for constructing Argo workflows.

Python functions are first class citizens in Hera - they are the atomic units (execution payload) that are submitted for remote execution. The framework makes it easy to wrap execution payloads into Argo Workflow tasks, set dependencies, resources, etc.

You can watch the introductory Hera presentation at the "Argo Workflows and Events Community Meeting 20 Oct 2021" here!

Table of content

Assumptions

Hera is exclusively dedicated to remote workflow submission and execution. Therefore, it requires an Argo server to be deployed to a Kubernetes cluster. Currently, Hera assumes that the Argo server sits behind an authentication layer that can authenticate workflow submission requests by using the Bearer token on the request. To learn how to deploy Argo to your own Kubernetes cluster you can follow the Argo Workflows guide!

Another option for workflow submission without the authentication layer is using port forwarding to your Argo server deployment and submitting workflows to localhost:2746 (2746 is the default, but you are free to use yours). Please refer to the documentation of Argo Workflows to see the command for port forward!

In the future some of these assumptions may either increase or decrease depending on the direction of the project. Hera is mostly designed for practical data science purposes, which assumes the presence of a DevOps team to set up an Argo server for workflow submission.

Installation

Hera can currently be installed directly from this repository using:

python3 -m pip install git+https://github.com/argoproj-labs/hera-workflows --ignore-installed

Alternatively, you can clone this repository and then run the following to install:

python setup.py install

Contributing

If you plan to submit contributions to Hera you can install Hera in a virtual environment managed by pipenv:

pipenv shell
pipenv sync --dev --pre

Also, see the contributing guide!

Concepts

Currently, Hera is centered around two core concepts. These concepts are also used by Argo, which Hera aims to stay consistent with:

  • Task - the object that holds the Python function for remote execution/the atomic unit of execution;
  • Workflow - the higher level representation of a collection of tasks.

Examples

A very primitive example of submitting a task within a workflow through Hera is:

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    """
    This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python 
    that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
    """
    print(message)


ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')
w = Workflow('my-workflow', ws)
t = Task('say', say, [{'message': 'Hello, world!'}])
w.add_task(t)
w.submit()

Examples

See the examples directory for a collection of Argo workflow construction and submission via Hera!

Comparison

There are other libraries currently available for structuring and submitting Argo Workflows:

  • Couler, which aims to provide a unified interface for constructing and managing workflows on different workflow engines;
  • Argo Python DSL, which allows you to programmaticaly define Argo worfklows using Python.

While the aforementioned libraries provide amazing functionality for Argo workflow construction and submission, they require an advanced understanding of Argo concepts. When Dyno Therapeutics started using Argo Workflows, it was challenging to construct and submit experimental machine learning workflows. Scientists and engineers at Dyno Therapeutics used a lot of time for workflow definition rather than the implementation of the atomic unit of execution - the Python function - that performed, for instance, model training.

Hera presents a much simpler interface for task and workflow construction, empowering users to focus on their own executable payloads rather than workflow setup. Here's a side by side comparison of Hera, Argo Python DSL, and Couler:

Hera Couler Argo Python DSL

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    print(message)


ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('diamond', ws)
a = Task('A', say, [{'message': 'This is task A!'}])
b = Task('B', say, [{'message': 'This is task B!'}])
c = Task('C', say, [{'message': 'This is task C!'}])
d = Task('D', say, [{'message': 'This is task D!'}])

a.next(b).next(d)  # a >> b >> d
a.next(c).next(d)  # a >> c >> d

w.add_tasks(a, b, c, d)
w.submit()

B [lambda: job(name="A"), lambda: job(name="C")], # A -> C [lambda: job(name="B"), lambda: job(name="D")], # B -> D [lambda: job(name="C"), lambda: job(name="D")], # C -> D ] ) diamond() submitter = ArgoSubmitter() couler.run(submitter=submitter) ">
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def job(name):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[name],
        step_name=name,
    )


def diamond():
    couler.dag(
        [
            [lambda: job(name="A")],
            [lambda: job(name="A"), lambda: job(name="B")],  # A -> B
            [lambda: job(name="A"), lambda: job(name="C")],  # A -> C
            [lambda: job(name="B"), lambda: job(name="D")],  # B -> D
            [lambda: job(name="C"), lambda: job(name="D")],  # C -> D
        ]
    )


diamond()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)

V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="B") @dependencies(["A"]) def B(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="C") @dependencies(["A"]) def C(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="D") @dependencies(["B", "C"]) def D(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @template @inputs.parameter(name="message") def echo(self, message: V1alpha1Parameter) -> V1Container: container = V1Container( image="alpine:3.7", name="echo", command=["echo", "{{inputs.parameters.message}}"], ) return container ">
from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

Comments
  • CICD Improvements

    CICD Improvements

    About

    this is a work in progress; it aims to check feasibility for further discussion

    Following #93, further improve CI + CD

    Tasks

    • [x] add coverage reports tasks and ci
      • [x] store all py vers X OS coverage files
      • [x] combine all py vers X OS coverage files
      • [x] create html cov
      • [x] store coverage report artifact
    • [x] add optional binary distribution to wheels via setup.py
      • [x] add a none-any build just in case and publish it
      • [x] add sdist build ( .tar.gz ) build for publishing
    • [x] store built wheels by tox
      • reason for publishing to tested wheels
    • [x] build wheels for all distributions ( matrix = py ver + os )
    • [x] build py3-none-any wheel + sdist ( .tar.gz )
    • [x] consolidation hera CICD into a single gh workflow
      • motivation re-use test / build job as a pre-publishing job
      • motivation to leverage artifacts passing between jobs rather than between different workflows
      • [x] add publish job ( similar to exiting one )
      • [x] change event triggers
      • [x] add if: to publish job
      • [x] delete file .github/workflows/hera_build_and_publish.yaml
      • [x] rename file .github/workflows/hera_pr.yaml -> .github/workflows/cicd.yaml ( or .. )
      • [x] fix jobs badge workflow name hera_build_and_publish.yaml -> cicd.yaml ( or .. ) in README.md
    • [x] enable upload to pypi-test for the purpose of validating entire publish job ( see publish job failure )
    • [x] replace upload to pypi-test with upload to pypi in publish job
    • [ ] setup codecov if we want a coverage badge to be introduced in this PR ( or propose a different solution )

    inspiration credit https://github.com/samuelcolvin/pydantic/blob/master/.github/workflows/ci.yml

    opened by harelwa 21
  • Add context management for workflow submission

    Add context management for workflow submission

    Currently, users have to add tasks to a workflow independently. After a task is defined, it is necessary to call workflow.add_tasks(t1, t2, ...) for otherwise the tasks are not added to the template of the DAG in the main workflow. To save some effort in this arena, a context manager can be introduced to auto-insert tasks into a workflow, and submit a workflow during the exit phase of the context. This will provide two ways of adding tasks to a workflow.

    Current, and supported way of adding tasks and submitting a workflow:

    wf = Workflow('test')
    t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo'])
    t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    t1 >> t2
    wf.add(t1, t2)
    wf.submit()
    

    Potential future (second) way of adding tasks and submitting a workflow:

    with Workflow('test') as wf:
        Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        >> Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    

    Alternatively:

    with Workflow('test') as wf:
        t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
        t1 >> t2
    

    This will require the implementation of a context manager on Workflow and a container for Task, which tasks insert themselves into during __init__:

    class Workflow:
    
        def __enter__(self, ...):
            ...
            return self
    
        def __exit__(self, ...):
            ...
            return self.submit()
    
    class _Container(list):
        
        @classmethod
        def add(cls, t: Task):
            cls.append(t)
    
    class Task:
    
        def __init__(...):
            ...
            _Container.add(self)
    

    Questions to consider:

    • Does this solve a real problem or would it cause confusion because of of the implicit behavior of task additions?
    • Is this an interface that abides by Hera's principle of simplicity? Is the _Container usage a bit too abstract?
    • Is it safe to assume that exiting a context should indeed submit a workflow? If not but the design is something desirable, should users be provided with the feature, along with a parameter on Workflow that says "auto_submit? Does that pollute the interface ofWorkflow` unnecessarily with QOL features rather than focusing on exposing Argo specific parameters?

    Would love to discuss this @bchalk101 and @bobh66! Any thoughts? Would this be useful to your use case?

    enhancement 
    opened by flaviuvadan 20
  • Multiple Python Versions CI

    Multiple Python Versions CI

    Purpose of PR

    related issues and PRs #73 #92 #94

    Provide multiple python versions CI ( CD will be handled in a different PR )

    To achieve this, introduce usage of tox for testing built wheels in "isolated envs" of multiple python versions.

    Notes regarding similar open PRs

    1. The difference between this PR and #92 is that it does not run pipenv in tox, but rather lets tox handle dependencies and venvs
    2. The difference between this PR and #94 that it keep pipenv as the project's dev env manager

    Tasks

    • [x] add tox.ini with python{3.7,3.8,3.9,3.10} test envs
    • [x] add build matrix of python versions to ci
      • ref docs: https://docs.github.com/en/actions/using-workflows/advanced-workflow-features#using-a-build-matrix
      • ref proj: https://github.com/spotify/luigi/
      • ref proj: https://github.com/samuelcolvin/pydantic/
    • [x] remove caching of Pipfile.lock
      • Pipfile.lock is a py37 lock file
    • [x] for dev purposes, update Pipfile and Pipfile.lock
      • for ci, tox is used directly as Pipfile.lock is a py37 lock file

    Notes for future changes

    1. Evaluate usage of pipenv in tox 1.1 See https://pipenv-fork.readthedocs.io/en/latest/advanced.html#tox-automation-project 1.2 Evaluate tox-pipenv
    2. Evaluate poetry to replace pipenv 2.1 See #94 2.2 See related discussion of "who does what and why" regarding tox and poetry: https://github.com/python-poetry/poetry/discussions/4307

    Changes to Pipfile

    To use tox, it was added to pipfile as a dev dependency, with the following command:

    pipenv install pytest-cov "tox<4.0" tox-wheel --dev
    

    tox-wheel was added as well to make tox build wheels rather than zip, so these can be used for publishing [ more below ].

    After running tox, you'll find the wheels under .tox/dist:

    ❯ ls .tox/dist
    hera_workflows-1.8.0rc7-py3-none-any.whl
    

    pytest-cov was added as well so we can add a coverage reports and tests, and perhaps add a coverage test.

    Even though this might be related to a different PR, I think it's also related to this PR as well, as it concerns major changes to how hera is tested. In this context, I've added the relevant pytest opt in setup.cfg.

    More on added tox.ini

    To use tox update / recreate your pipenv virtual env according to the updated Pipfile

    An initial tox.ini was created with a few tox envs.

    The "main" env - testenv -- provides the first desired functionality discussed in #73 and tests 4 python versions, including 3.10 ( latest ).

    To list all tox envs you can run:

    ❯ tox -a
    python3.7
    python3.8
    python3.9
    python3.10
    lint
    typecheck
    

    And to run a specific one, e.g. lint, run -

    ❯ tox -e lint
    
    opened by harelwa 15
  • Unable to resolve methods outside script

    Unable to resolve methods outside script

    Hi there, I'm new to Hera and Argo so bear with me if the question is rookie:

    Here is my code snippet:

    from app.common.image.image_utils import read_image
    from app.common.models.thumbnail_metadata import ThumbnailMetadata
    from app.common.s3_utils import resolve_s3_url, upload
    from app.common.tasks.argo_task import ArgoTask
    from app.services.argo_client import ArgoWorkflowService
    
    class ThumbnailTask(ArgoTask):
        def __init__(self, metadata: ThumbnailMetadata):
            # .... some more code
    
        def create_thumbnails(self, image_url: str, thumbnail_sizes: list[dict]):
            blob = read_image(self.image_url)
    
            # .... some more code
    
        def create(self):
            ArgoWorkflowService.create_task('create_thumbnails', ThumbnailTask.create_thumbnails, [{
                'image_url': self.image_url,
                'thumbnail_sizes': self.thumbnail_sizes
            }])
    
    from typing import Callable
    from hera.retry import Retry
    from hera.task import Task
    from hera.workflow import Workflow
    from hera.workflow_service import WorkflowService
    
    from app.core.config import get_app_settings
    
    class ArgoWorkflowService(object):
        workflow_service = None
        workflow = None
    
        @staticmethod
        def create_task(name: str, func: Callable, func_params: list[dict]):
            # .... some more code
    
            task = Task(name, func, func_params, image='my-repo.dkr.ecr.us-east-2.amazonaws.com/my-app:latest', retry=Retry(duration=3, max_duration=60))
            ArgoWorkflowService.workflow.add_task(task)
            ArgoWorkflowService.workflow.create()
    

    The error I received in Argo is:

    │ main Traceback (most recent call last):                                                                                                                                                                                         │
    │ main   File "/argo/staging/script", line 5, in <module>                                                                                                                                                                         │
    │ main     blob = read_image(self.image_url)                                                                                                                                                                                      │
    │ main NameError: name 'read_image' is not defined                                                                                                                                                                                │
    │
    

    The method read_image is from other packages.

    In addition, Argo was unable to import 3rd party python libraries. I used the same application image but still not working.

    Any help would be appreciated!

    question 
    opened by linyaoli 14
  • How to create a workflow with 10k tasks

    How to create a workflow with 10k tasks

    I'd like to have a workflow with many tasks, but I'm running into the 1.5MB k8s/etcd file limit. The workflow isn't complicated, it's basically 10k very-short bash commands that all run on the same image with the same resources etc.

    I think the solution here is to use a WorkflowTemplate, but I haven't figured out how to use input parameters with hera.

    I have something like this:

    # set up the WorkflowTemplate
    wt = WorkflowTemplate(...)
    
    # is this the right way to pass input in?
    t = Task("cmd", lambda: _, command=["bash", "-c", "{{inputs.parameters.cmd}}"], ...)
    wt.add_task(t)
    wt.create()
    
    # how do I get these commands into the workflow as parameters?
    commands = ["echo foo", "echo bar"]
    
    # create the Workflow
    workflow = Workflow(..., workflow_template_ref=wt.name)
    workflow.create()
    
    question stale 
    opened by dmerrick 13
  • How to ignore certificate errors?

    How to ignore certificate errors?

    When I attempt to port-forward Argo running in GKE, I get a certificate error. I tried the suggestion from the first comment here, but I still get the certificate error.

    C:\Users\ryanr\Code\hera-workflows\examples [main ≡ +0 ~1 -0 !]> python .\diamond.py
    Traceback (most recent call last):
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 706, in urlopen      
        chunked=chunked,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 382, in _make_request
        self._validate_conn(conn)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 1010, in _validate_conn
        conn.connect()
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connection.py", line 426, in connect
        tls_in_tls=tls_in_tls,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 450, in ssl_wrap_socket
        sock, context, tls_in_tls, server_hostname=server_hostname
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 493, in _ssl_wrap_socket_impl
        return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 423, in wrap_socket
        session=session
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 870, in _create
        self.do_handshake()
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 1139, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File ".\diamond.py", line 32, in <module>
        w.submit()
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow.py", line 129, in submit
        self.service.submit(self.workflow, namespace)
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow_service.py", line 48, in submit
        return self.service.create_workflow(namespace, V1alpha1WorkflowCreateRequest(workflow=workflow))
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 62, in create_workflow
        return self.create_workflow_with_http_info(namespace, body, **kwargs)  # noqa: E501
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 162, in create_workflow_with_http_info
        collection_formats=collection_formats)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 369, in call_api
        _preload_content, _request_timeout, _host)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 185, in __call_api
        _request_timeout=_request_timeout)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 413, in request
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 271, in POST
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 168, in request
        headers=headers)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 79, in request
        method, url, fields=fields, headers=headers, **urlopen_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 170, in request_encode_body
        return self.urlopen(method, url, **extra_kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\poolmanager.py", line 375, in urlopen
        response = conn.urlopen(method, u.request_uri, **kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 756, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\retry.py", line 574, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=2746): Max retries exceeded with url: /api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))
    
    opened by tachyus-ryan 13
  • Support latest Python version

    Support latest Python version

    Hi! Thank you for developing such a great tool! Currently, Hera supports only Python 3.7. The version is now security status, will become end of support soon.

    reference: https://www.python.org/downloads/

    So it is better that supporting Python 3.8+, IMO. I'm happy to create PR for supporting newer version of Python, if it is OK.

    Thank you.

    enhancement 
    opened by hirosassa 12
  • Add task template mechanism

    Add task template mechanism

    Description

    This MR provides additional funcitonality for templating tasks with a class TaskTemplate.

    Example

    task_template = TaskTemplate('myTemplate', say,
        [{
            'message': "default value for param 'message'",
            "other_message": {"default": {"value": {"for": "other_message"}}},
        }],
    )
    a = task_template.task("taskWithDefaultValues")
    b = task_template.task("taskWithNonDefaultValues", [{"other_message": "hello again"}])
    
    
    opened by kromanow94 10
  • How about supporting onExit?

    How about supporting onExit?

    Argo Workflows provides a function, onExit, to execute a template after the workflow has finished. https://github.com/argoproj/argo-workflows/blob/master/examples/exit-handlers.yaml

    How about supporting onExit for hera-workflows as well? I have tested the following implementation.

    class WorkflowWithOnExit(Workflow):
        def __init__(self, name: str, service: WorkflowService, **kw):
            super().__init__(name, service, **kw)
    
        def add_tasks_on_exit(self, on_exit_task: "OnExitTask", *ts: Task) -> None:
            template_on_exit_name = self.name + "-onExit"
            setattr(self.spec, "on_exit", template_on_exit_name)
    
            # prepare template for onExit
            dag_template_on_exit = IoArgoprojWorkflowV1alpha1DAGTemplate(tasks=[])
            template_on_exit = IoArgoprojWorkflowV1alpha1Template(
                name=template_on_exit_name,
                steps=[],
                dag=dag_template_on_exit,
                parallelism=self.parallelism,
            )
            self.spec.templates.append(template_on_exit)
    
            # HACK: Not using workflow_editors.add_tasks to add to dag_template_on_exit.tasks
            for t in (on_exit_task,) + ts:
                self.spec.templates.append(t.argo_template)
                if t.resources.volumes:
                    for vol in t.resources.volumes:
                        if isinstance(vol, Volume):
                            self.spec.volume_claim_templates.append(vol.get_claim_spec())
                        else:
                            self.spec.volumes.append(vol.get_volume())
                dag_template_on_exit.tasks.append(t.argo_task)
    
    
    class OnExitTask(Task):
        def when_by_workflow_status(
            self, operator: Operator, workflow_status: WorkflowStatus
        ):
            self.argo_task.when = (
                f"{{{{workflow.status}}}} {operator.value} {workflow_status.value}"
            )
    
    def f_with_error():
        raise RuntimeError("Err!")
    
    
    def exit_handler():
        print("exit_handler")
    
    
    def exit_handler_next():
        print("exit_handler_next")
    
    ws = WorkflowService(
        host=host,
        token=token,
        namespace=namespace,
    )
    
    w = WorkflowWithOnExit(
        "test_exit_handler",
        ws,
        namespace=namespace,
        service_account_name=service_account,
    )
    
    task_exit_handler = OnExitTask("exit_handler", exit_handler, image=image)
    task_exit_handler.when_by_workflow_status(Operator.not_equal, WorkflowStatus.Succeeded)
    task_exit_handler_next = Task("exit_handler_next", exit_handler_next, image=image)
    task_exit_handler >> task_exit_handler_next
    
    task_with_error = Task("task_with_error", f_with_error, image=image)
    
    w.add_tasks(task_with_error)
    w.add_tasks_on_exit(task_exit_handler, task_exit_handler_next)
    
    w.create()
    
    STEP                         TEMPLATE                  PODNAME                       DURATION  MESSAGE
     ✖ test-exit-handler         test-exit-handler
     └─✖ task-with-error         task-with-error           test-exit-handler-153195892   12s       Error (exit code 1)
    
     ✔ test-exit-handler.onExit  test-exit-handler-onExit
     ├─✔ exit-handler            exit-handler              test-exit-handler-2402822171  10s
     └─✔ exit-handler-next       exit-handler-next         test-exit-handler-2592397439  10s
    
    enhancement 
    opened by szdr 10
  • Support multiple python using tox with Poetry

    Support multiple python using tox with Poetry

    fix #73

    What is this PR?

    Related to #93, I implemented support for multiple Python versions using tox. Different from #93, this PR uses Poetry as a package management to simplify configurations of linters, tests and builds.

    opened by hirosassa 10
  • Dynamic fanout fanin fails with error

    Dynamic fanout fanin fails with error

    Hi, I was trying to use the dynamic fanout fanin example, https://github.com/argoproj-labs/hera-workflows/blob/main/examples/dynamic_fanout_fanin.py and I get the following error message, when trying to run it.

        raise ServiceException(http_resp=r)
    argo_workflows.exceptions.ServiceException: (500)
    Reason: Internal Server Error
    HTTP response headers: HTTPHeaderDict({'Date': 'Wed, 15 Jun 2022 15:15:35 GMT', 'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Trailer': 'Grpc-Trailer-Content-Type'})
    HTTP response body: {"code":2,"message":"templates.dynamic-fanout.tasks.fanin failed to resolve {{tasks.fanout.outputs.parameters}}"}
    

    Any ideas on what might be happening. Does it require a newer Argo installation?

    EDIT: The following Argo workflow works, and is similar to what I wanted to do. Is it then a Hera issue?

    apiVersion: argoproj.io/v1alpha1
    
    kind: Workflow
    metadata:
      namespace: argo-data
      generateName: fanin-example
      labels:
        workflows.argoproj.io/archive-strategy: "false"
      annotations:
        workflows.argoproj.io/description: |
          Test fanin
    
    spec:
      serviceAccountName: argo
      parallelism: 5
    
      entrypoint: loop-param-result-example
      templates:
        - name: loop-param-result-example
          steps:
            - - name: generate
                template: gen-number-list
            - - name: write
                template: output-number
                arguments:
                  parameters:
                    - name: number
                      value: "{{item}}"
                withParam: "{{steps.generate.outputs.result}}"
            - - name: fan-in
                template: fan-in
                arguments:
                  parameters:
                    - name: numbers
                      value: "{{steps.write.outputs.parameters}}"
    
        - name: gen-number-list
          script:
            image: python:alpine3.6
            command: [python]
            source: |
              import json
              import sys
              json.dump([i for i in range(20, 31)], sys.stdout)
    
        - name: output-number
          inputs:
            parameters:
              - name: number
          container:
            image: alpine:latest
            command: [sh, -c]
            args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
          outputs:
            parameters:
              - name: number
                valueFrom:
                  path: /tmp/number.txt
    
        - name: fan-in
          inputs:
            parameters:
              - name: numbers
          container:
            image: alpine:latest
            command: [sh, -c]
            args: ["echo received {{inputs.parameters.numbers}}"]
    
    opened by vikramsg 9
  • Return argo result on submission

    Return argo result on submission

    If we're using concepts like generate_name, we won't know the final name of the workflow until we submit it. The user might want to fetch this via the returned object upon creation, which holds this information. Typically: result.metadata['name']

    opened by Trollgeir 0
  • Support for Task/Workflow hooks or modifiers

    Support for Task/Workflow hooks or modifiers

    Hi, I'd like to be able to set some "global" hooks that will be called during Task or Workflow creation that can modify them. For example, my company has:

    • custom GKE selectors and tolerations required to use high mem instances that must be set in any code that may provision high mem jobs
    • custom GKE volume storage classes that set WaitForFirstConsumer to prevent volumes from being provisioned in 1 zone, but the pod in another (leading to unschedulable jobs).
    • labels that we apply to most jobs for cost and other attribution; we'd like to either add defaults if not set OR error to catch missing ones

    For all tasks/workflows submitted within our projects, we'd like these to be handled automatically, rather than every use needing the extra logic. Additionally, I've been thinking about writing some code that wraps hera and may not expose Tasks/Workflows directly (ie: it will create those for the user) - this layer would make it a bit more challenging to apply these customizations, but hooks might be an easy way to bypass.

    --

    A couple design points / questions:

    • I think it'd be useful for these to run on Task/Workflow creation (ie: at the end of __init__), but another option might be to call them on .build (just before generating the spec). I think on __init__ would be beneficial because:
      • it would allow folks to inspect the generated object and tweak further if desired
      • any hook errors, intentionally or not, will show tracebacks right at the source, not later where .build is finally is called
    • the hooks could probably be registered to a specific WorkflowService instance, rather than be "true" globals. However, setting them on a WorkflowService instance would force us to use the .build approach from the item above, otherwise the Task won't have the service available to identify the hooks. If they were true global hooks, then Task.__init__ could just find and call them at the end.
    • What is the right name for this concept, "hook", "modifier", etc?
      • I somewhat like "hook" because nothing forces the implementation to modify (though I don't know what else they'd do 😅) and it hints at the mechanism (that things are being "intercepted")

    --

    Spitballing the updates for WorkflowService, maybe something like:

    TaskHook = Callable[[Task], None]
    WorkflowHook = Callable[[Workflow], None]
    
    # Add instance attributes tracking the hooks (pretending this is a pydantic model, so setting fields this way). dicts instead of lists may be useful in case we want to de-register or override.
    class WorkflowService(...):
        task_hooks = list[TaskHook] # An ordered set of hooks that will be called on Task creation in FIFO order.
        workflow_hooks = list[WorkflowHook] # An ordered set of hooks that will be called on Workflow creation in FIFO order.
    
        def register_{task,workflow}_hook(self, hook: {Task,Workflow}Hook) -> {Task,Workflow}Hook:
            self.{task,workflow}_hooks.append(hook)
            return hook # Allow use as a @decorator
    

    Then, perhaps Workflow.build would lookup the hooks for the provided (or defaulted) WorkflowService and call all the appropriate hooks.

    enhancement 
    opened by JacobHayes 0
  • Use token from ~/.kube/config if no token provided

    Use token from ~/.kube/config if no token provided

    Currently, in the Python SDK of argo_workflows, I do not need to pass a token explicitly and can just do:

    import argo_workflows
    argo_config = argo_workflows.Configuration(host="....")
    api_client = argo_workflows.ApiClient(argo_config)
    

    which then uses the token in ~/.kube/config if I'm not mistaken.

    In Hera, I believe this is currently not possible.

    Is this on purpose? It seems to me that using the token from ~/.kube/config is more convenient and secure than copying/pasting tokens.

    question 
    opened by rubenvdg 3
  • Hera-SDK failing on custom service account

    Hera-SDK failing on custom service account

    Facing issue while using custom roles in. k8s , all access related privilege seems okay as I am able to create, update and delete workflows using argoworkflow UI but using hera-sdk we are facing auth issue.

    UnauthorizedException: (401) Reason: Unauthorized HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Trailer': 'Grpc-Trailer-Content-Type', 'Date': 'Wed, 20 Jul 2022 08:00:59 GMT', 'Transfer-Encoding': 'chunked'}) HTTP response body: {"code":16,"message":"Unauthorized"} The same token when used on argo workflow UI works perfectly. Not sure what is actually access level that is required by hera-sdk

    bug 
    opened by smetal1 2
Owner
argoproj-labs
argoproj-labs
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

What is Argo Workflows? Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Argo Workflow

null 0 Dec 10, 2021
Notifications for Argo CD

Argo CD Notifications Argo CD Notifications continuously monitors Argo CD applications and provides a flexible way to notify users about important cha

argoproj-labs 457 Sep 16, 2022
Automatic container image update for Argo CD

Argo CD Image Updater Introduction Argo CD Image Updater is a tool to automatically update the container images of Kubernetes workloads which are mana

argoproj-labs 655 Sep 19, 2022
A Kubernetes operator for managing Argo CD clusters.

Argo CD Operator A Kubernetes operator for managing Argo CD clusters. Documentation See the documentation for installation and usage of the operator.

argoproj-labs 338 Sep 23, 2022
Support for extending Argo CD

Argo CD Extensions To enable Extensions for your Argo CD cluster will require just a single kubectl apply. Here we provide a way to extend Argo CD suc

argoproj-labs 54 Sep 27, 2022
Argo-CD Autopilot

Introduction New users to GitOps and Argo CD are not often sure how they should structure their repos, add applications, promote apps across environme

argoproj-labs 442 Sep 22, 2022
Argo CD ApplicationSet Controller

The ApplicationSet controller manages multiple Argo CD Applications as a single ApplicationSet unit, supporting deployments to large numbers of clusters, deployments of large monorepos, and enabling secure Application self-service.

argoproj-labs 543 Sep 21, 2022
A series of controllers for configuring namespaces to accomodate Argo

argo-controller A series of controllers for configuring namespaces to accomodate Argo. ArgoCD TBD Argo Workflows Make a service account in every names

Statistics Canada - Statistique Canada 0 Jan 4, 2022
Run infrastructure as code (IaC) software tools including CDK, Terraform and Cloud Formation via GitOps.

Argo CloudOps is Alpha on a good day, please only use as appropriate!!! What Is Argo CloudOps? Argo CloudOps is a service for running infrastructure a

argoproj-labs 232 Sep 22, 2022
Argus is a lightweight monitor to notify of new software releases via Gotify/Slack messages and/or WebHooks.

Argus Argus will query websites at a user defined interval for new software releases and then trigger Gotify/Slack notification(s) and/or WebHook(s) w

Argus 125 Sep 21, 2022
Provider-generic-workflows - A generic provider which uses argo workflows to define the backend actions.

provider-generic-workflows provider-generic-workflows is a generic provider which uses argo workflows for managing the external resource. This will re

Shailendra Sirohi 0 Jan 1, 2022
Argo Rollout visualization in Argo CD Web UI

Rollout Extension The project introduces the Argo Rollout dashboard into the Argo CD Web UI. Quick Start Install Argo CD and Argo CD Extensions Contro

argoproj-labs 64 Sep 27, 2022
Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows.

argocon-21-demo Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows. This repo implements a Go-based CLI

null 4 Aug 30, 2022
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

What is Argo Workflows? Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Argo Workflow

null 0 Dec 10, 2021
Argo Workflows URL Finder

Argo Workflows URL Finder When Argo Workflows archives a workflow, the URL changes, and the URL cannot be pre-determined. If you're using tools such a

Sendible Labs 0 Aug 25, 2022
A framework for constructing self-spreading binaries

A framework that aids in creation of self-spreading software Requirements go get -u github.com/redcode-labs/Coldfire go get -u github.com/yelinaung/go

Red Code Labs 916 Sep 27, 2022
Go library for parsing and submitting HTML forms

gosubmit Description Docs are available here: https://godoc.org/github.com/jeremija/gosubmit Helps filling out plain html forms during testing. Will a

Jerko Steiner 19 Sep 20, 2022
Terraform utility provider for constructing bash scripts that use data from a Terraform module

Terraform Bash Provider This is a Terraform utility provider which aims to robustly generate Bash scripts which refer to data that originated in Terra

Martin Atkins 33 Sep 6, 2022
Automating Kubernetes Rollouts with Argo and Prometheus. Checkout the demo URL below

observe-argo-rollout Demo for Automating and Monitoring Kubernetes Rollouts with Argo and Prometheus Performing Demo The demo can be found on Katacoda

null 32 Sep 2, 2022
Simple example using Git actions + Argo CD + K8S + Docker and GO lang

CICD-simple_example Simple example using Git actions + Argo CD + K8S + Docker and GO lang Intro Pre reqs Have an ArgoCD account and Installed. Docker

Diogo Miyake 1 Oct 28, 2021