Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqlite-based memory store for live workflows. Fixes #12025 #12736

Merged
merged 42 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8793b7a
feat: add sqlite-based memory store for live workflows
jiachengxu Mar 4, 2024
55cd632
fix: lint
jiachengxu Mar 4, 2024
3f76d4f
chore(test): update the test
jiachengxu Mar 7, 2024
bdb010d
chore(test): try to set cgo enabled for go sqlite
jiachengxu Mar 7, 2024
a451490
chore: remove direct use of go-sqlite3
jiachengxu Mar 7, 2024
e6d3d25
chore: add build-base to use cgo in builder image
jiachengxu Mar 7, 2024
3eac5a4
chore: small fix for test
jiachengxu Mar 7, 2024
abc06d3
chore: get rid of cgo
jiachengxu Mar 11, 2024
ae50e97
chore: revert the makefile change
jiachengxu Mar 11, 2024
05e4029
chore: revert the makefile change
jiachengxu Mar 11, 2024
c944a61
chore: update
jiachengxu Mar 11, 2024
f322316
fix: lint
jiachengxu Mar 11, 2024
f00720d
chore(stage): add test
jiachengxu Mar 19, 2024
2f0e02e
Merge branch 'main' into list-workflow
jiachengxu Mar 19, 2024
9647006
chore(stage): fix lint
jiachengxu Mar 19, 2024
85665e3
feat: implement pagination
jiachengxu Mar 25, 2024
e0df0ff
fix: unit test
jiachengxu Mar 25, 2024
bafa4f1
fix: lint
jiachengxu Mar 25, 2024
9883a3c
Merge branch 'main' into list-workflow
jiachengxu Mar 25, 2024
b2b4fe2
fix: e2e test
jiachengxu Mar 25, 2024
c0b8d84
Merge branch 'main' into list-workflow
jiachengxu Mar 28, 2024
ee14a1b
Merge branch 'main' into list-workflow
jiachengxu Mar 28, 2024
fe68611
Merge remote-tracking branch 'upstream/main' into list-workflow
jiachengxu Apr 2, 2024
06fb1c7
Merge remote-tracking branch 'refs/remotes/origin/list-workflow' into…
jiachengxu Apr 2, 2024
9873620
Merge branch 'main' into list-workflow
jiachengxu Apr 2, 2024
c245e69
fix: imports
jiachengxu Apr 3, 2024
fd5f9da
fix: load entire archived workflow into memory in list APIs
jiachengxu Apr 8, 2024
8dc8d1a
fix: add support for mysql
jiachengxu Apr 8, 2024
f2552ad
Merge branch 'main' into list-workflow
jiachengxu Apr 8, 2024
de457f6
Merge branch 'select-archived-workflow-fix' into list-workflow
jiachengxu Apr 8, 2024
5ae57ac
fix: handle null value from archived store
jiachengxu Apr 9, 2024
42831fb
fix: cursor pagination
jiachengxu Apr 11, 2024
fc431ed
Merge remote-tracking branch 'upstream/main' into list-workflow
jiachengxu Apr 11, 2024
67ae8f9
fix: import
jiachengxu Apr 11, 2024
c8b850c
Merge branch 'main' into list-workflow
jiachengxu Apr 12, 2024
df6fdf5
Merge branch 'main' into list-workflow
jiachengxu Apr 12, 2024
68b33f4
Merge remote-tracking branch 'upstream/main' into list-workflow
jiachengxu Apr 17, 2024
9c8149b
fix: import
jiachengxu Apr 17, 2024
604d689
chore: empty commit
jiachengxu Apr 17, 2024
d1f0ace
fix: address PR comment
jiachengxu Apr 25, 2024
1001279
Merge remote-tracking branch 'upstream/main' into list-workflow
jiachengxu Apr 25, 2024
66bcc6e
Merge remote-tracking branch 'upstream/main' into list-workflow
jiachengxu Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ RUN apk update && apk add --no-cache \
curl \
gcc \
bash \
mailcap
mailcap \
build-base
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

WORKDIR /go/src/github.com/argoproj/argo-workflows
COPY go.mod .
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,20 @@ dist/argo-windows-%.gz: dist/argo-windows-%
gzip --force --keep dist/argo-windows-$*.exe

dist/argo-windows-%: server/static/files.go $(CLI_PKGS) go.sum
CGO_ENABLED=0 $(GOARGS) go build -v -gcflags '${GCFLAGS}' -ldflags '${LDFLAGS} -extldflags -static' -o $@.exe ./cmd/argo
CGO_ENABLED=1 $(GOARGS) go build -v -gcflags '${GCFLAGS}' -ldflags '${LDFLAGS} -extldflags -static' -o $@.exe ./cmd/argo
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

dist/argo-%.gz: dist/argo-%
gzip --force --keep dist/argo-$*

dist/argo-%: server/static/files.go $(CLI_PKGS) go.sum
CGO_ENABLED=0 $(GOARGS) go build -v -gcflags '${GCFLAGS}' -ldflags '${LDFLAGS} -extldflags -static' -o $@ ./cmd/argo
CGO_ENABLED=1 $(GOARGS) go build -v -gcflags '${GCFLAGS}' -ldflags '${LDFLAGS} -extldflags -static' -o $@ ./cmd/argo

dist/argo: server/static/files.go $(CLI_PKGS) go.sum
ifeq ($(shell uname -s),Darwin)
# if local, then build fast: use CGO and dynamic-linking
go build -v -gcflags '${GCFLAGS}' -ldflags '${LDFLAGS}' -o $@ ./cmd/argo
else
CGO_ENABLED=0 go build -gcflags '${GCFLAGS}' -v -ldflags '${LDFLAGS} -extldflags -static' -o $@ ./cmd/argo
CGO_ENABLED=1 go build -gcflags '${GCFLAGS}' -v -ldflags '${LDFLAGS} -extldflags -static' -o $@ ./cmd/argo
endif

argocli-image:
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand All @@ -114,8 +115,8 @@ require (
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
)
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -1170,8 +1172,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1466,8 +1468,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10-0.20220218145154-897bd77cd717/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
26 changes: 15 additions & 11 deletions persist/sqldb/archived_workflow_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (r *workflowArchive) ListWorkflowsLabelValues(key string) (*wfv1.LabelValue
return &wfv1.LabelValues{Items: labels}, nil
}

func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements) (db.Selector, error) {
func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements, tableName, labelTableName string, hasClusterName bool) (db.Selector, error) {
for _, req := range requirements {
cond, err := requirementToCondition(t, req)
cond, err := requirementToCondition(t, req, tableName, labelTableName, hasClusterName)
if err != nil {
return nil, err
}
Expand All @@ -63,36 +63,40 @@ func labelsClause(selector db.Selector, t dbType, requirements labels.Requiremen
return selector, nil
}

func requirementToCondition(t dbType, r labels.Requirement) (*db.RawExpr, error) {
func requirementToCondition(t dbType, r labels.Requirement, tableName, labelTableName string, hasClusterName bool) (*db.RawExpr, error) {
clusterNameSelector := ""
if hasClusterName {
clusterNameSelector = fmt.Sprintf("clustername = %s.clustername and", tableName)
}
// Should we "sanitize our inputs"? No.
// https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
// Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between.
// https://kb.objectrocket.com/postgresql/casting-in-postgresql-570#string+to+integer+casting
switch r.Operator() {
case selection.DoesNotExist:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil
case selection.Equals, selection.DoubleEquals:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil
return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil
case selection.In:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
case selection.NotEquals:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil
case selection.NotIn:
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil
case selection.Exists:
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil
return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil
case selection.GreaterThan:
i, err := strconv.Atoi(r.Values().List()[0])
if err != nil {
return nil, err
}
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) > %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil
return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) > %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil
case selection.LessThan:
i, err := strconv.Atoi(r.Values().List()[0])
if err != nil {
return nil, err
}
return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) < %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil
return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) < %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil
}
return nil, fmt.Errorf("operation %v is not supported", r.Operator())
}
2 changes: 1 addition & 1 deletion persist/sqldb/archived_workflow_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_labelsClause(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, req := range tt.requirements {
got, err := requirementToCondition(tt.dbType, req)
got, err := requirementToCondition(tt.dbType, req, archiveTableName, archiveLabelsTableName, true)
if assert.NoError(t, err) {
assert.Equal(t, tt.want, *got)
}
Expand Down
1 change: 1 addition & 0 deletions persist/sqldb/db_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type dbType string
const (
MySQL dbType = "mysql"
Postgres dbType = "postgres"
SQLite dbType = "sqlite"
)

func dbTypeFor(session db.Session) dbType {
Expand Down
33 changes: 33 additions & 0 deletions persist/sqldb/selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sqldb

import (
"time"

"github.com/upper/db/v4"
"k8s.io/apimachinery/pkg/labels"
)

func BuildWorkflowSelector(selector db.Selector, tableName, labelTableName string, hasClusterName bool, t dbType, namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit, offset int) (db.Selector, error) {
// If we were passed 0 as the limit, then we should load all available archived workflows
// to match the behavior of the `List` operations in the Kubernetes API
if limit == 0 {
limit = -1
offset = -1
}

selector = selector.
And(namespaceEqual(namespace)).
And(nameEqual(name)).
And(namePrefixClause(namePrefix)).
And(startedAtFromClause(minStartedAt)).
And(startedAtToClause(maxStartedAt))

selector, err := labelsClause(selector, t, labelRequirements, tableName, labelTableName, hasClusterName)
if err != nil {
return nil, err
}
return selector.
OrderBy("-startedat").
Limit(limit).
Offset(offset), nil
}
32 changes: 6 additions & 26 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,32 +144,17 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowRecord

// If we were passed 0 as the limit, then we should load all available archived workflows
// to match the behavior of the `List` operations in the Kubernetes API
if limit == 0 {
limit = -1
offset = -1
}

selector := r.session.SQL().
Select("workflow").
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
And(nameEqual(name)).
And(namePrefixClause(namePrefix)).
And(startedAtFromClause(minStartedAt)).
And(startedAtToClause(maxStartedAt))
Where(r.clusterManagedNamespaceAndInstanceID())

selector, err := labelsClause(selector, r.dbType, labelRequirements)
selector, err := BuildWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, true, r.dbType, namespace, name, namePrefix, minStartedAt, maxStartedAt, labelRequirements, limit, offset)
if err != nil {
return nil, err
}
err = selector.
OrderBy("-startedat").
Limit(limit).
Offset(offset).
All(&archivedWfs)

err = selector.All(&archivedWfs)
if err != nil {
return nil, err
}
Expand All @@ -195,14 +180,9 @@ func (r *workflowArchive) CountWorkflows(namespace string, name string, namePref
selector := r.session.SQL().
Select(db.Raw("count(*) as total")).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
And(nameEqual(name)).
And(namePrefixClause(namePrefix)).
And(startedAtFromClause(minStartedAt)).
And(startedAtToClause(maxStartedAt))
Where(r.clusterManagedNamespaceAndInstanceID())

selector, err := labelsClause(selector, r.dbType, labelRequirements)
selector, err := BuildWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, true, r.dbType, namespace, name, namePrefix, minStartedAt, maxStartedAt, labelRequirements, 0, 0)
if err != nil {
return 0, err
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/apiclient/argo-kube-client.go
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow"
"github.com/argoproj/argo-workflows/v3/server/types"
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow"
"github.com/argoproj/argo-workflows/v3/server/workflow/store"
"github.com/argoproj/argo-workflows/v3/server/workflowarchive"
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate"
"github.com/argoproj/argo-workflows/v3/util/help"
Expand All @@ -38,6 +39,8 @@ var (

type argoKubeClient struct {
instanceIDService instanceid.Service
wfClient workflow.Interface
wfStore store.WorkflowStore
}

var _ Client = &argoKubeClient{}
Expand Down Expand Up @@ -84,13 +87,17 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService}, nil
wfStore, err := store.NewSQLiteStore(instanceIDService)
if err != nil {
return nil, nil, err
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return ctx, &argoKubeClient{instanceIDService, wfClient, wfStore}, nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer, a.wfClient, a.wfStore)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
Expand Down