diff --git a/README.md b/README.md index 7c3db492..64c59943 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,4 @@ Kine is an etcdshim that translates etcd API to sqlite, Postgres, Mysql, and dql - Can be ran standalone so any k8s (not just k3s) can use Kine - Implements a subset of etcdAPI (not usable at all for general purpose etcd) - Translates etcdTX calls into the desired API (Create, Update, Delete) -- Backend drivers for dqlite, sqlite, Postgres, MySQL +- Backend drivers for dqlite, sqlite, Postgres, MySQL and NATS JetStream diff --git a/go.mod b/go.mod index f7f6fda7..6e486e3d 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,14 @@ require ( github.com/Rican7/retry v0.1.0 github.com/canonical/go-dqlite v1.5.1 github.com/go-sql-driver/mysql v1.6.0 + github.com/klauspost/compress v1.14.4 github.com/lib/pq v1.10.2 github.com/mattn/go-sqlite3 v1.14.8 + github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee + github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 github.com/pkg/errors v0.9.1 github.com/rancher/wrangler v0.8.3 + github.com/shengdoushi/base58 v1.0.0 github.com/sirupsen/logrus v1.7.0 github.com/soheilhy/cmux v0.1.5 github.com/urfave/cli v1.21.0 diff --git a/go.sum b/go.sum index e31b0d3c..136f249d 100644 --- a/go.sum +++ b/go.sum @@ -255,8 +255,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -337,6 +338,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -369,6 +372,8 @@ github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -392,6 +397,19 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee h1:+l6i7zS8N1LOokm7dzShezI9STRGrzp0O49Pw8Jetdk= +github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee/go.mod h1:EKSYvbvWAoh0hIfuZ+ieWm8u0VOTRTeDfuQvNPKRqEg= +github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= +github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72 h1:Moe/K4fo/5FCNpE/TYrMt7sEPUuldBVJ0D4g/SWFkd0= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= +github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 h1:czbQ9uYuV7dwLsh/0vpB+4rutgdLTYgoN5W5hf1S0eg= +github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -462,6 +480,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/shengdoushi/base58 v1.0.0 h1:tGe4o6TmdXFJWoI31VoSWvuaKxf0Px3gqa3sUWhAxBs= +github.com/shengdoushi/base58 v1.0.0/go.mod h1:m5uIILfzcKMw6238iWAhP4l3s5+uXyF3+bJKUNhAL9I= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -603,8 +623,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce h1:Roh6XWxHFKrPgC/EQhVubSAGQ6Ozk6IdxHSzt1mR0EI= +golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -663,8 +685,9 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -689,6 +712,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -714,9 +738,12 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -724,13 +751,15 @@ golang.org/x/text v0.3.1-0.20171227012246-e19ae1496984/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/hack/histogram.py b/hack/histogram.py index fc77a189..a39056e1 100755 --- a/hack/histogram.py +++ b/hack/histogram.py @@ -61,7 +61,7 @@ def main(type, *args, **kwargs): operations[labels['operation']]['count'] = value for operation, stats in operations.items(): - print(f"\n{stats['sum'] / stats['count']:.3f} average etcd request duration (seconds): {operation} {stats['type']}") + print(f"\n{stats['sum'] / stats['count']:.3f} average {kwargs['backend_name']} request duration (seconds): {operation} {stats['type']}") fig = tpl.figure() fig.barh(stats['counts'], stats['buckets'], max_width=50) fig.show() @@ -71,6 +71,7 @@ def main(type, *args, **kwargs): parser = argparse.ArgumentParser() parser.add_argument('--type', '-t', type=str, required=False, default='core.ConfigMap') parser.add_argument('--log-level', '-l', type=str, required=False, default='INFO') + parser.add_argument('--backend-name', '-b', type=str, required=False, default='etcd') args = parser.parse_args() try: diff --git a/pkg/drivers/dqlite/dqlite.go b/pkg/drivers/dqlite/dqlite.go index 35a99ab8..efd203b6 100644 --- a/pkg/drivers/dqlite/dqlite.go +++ b/pkg/drivers/dqlite/dqlite.go @@ -1,3 +1,4 @@ +//go:build dqlite // +build dqlite package dqlite diff --git a/pkg/drivers/dqlite/no_dqlite.go b/pkg/drivers/dqlite/no_dqlite.go index d8809658..b0f48a41 100644 --- a/pkg/drivers/dqlite/no_dqlite.go +++ b/pkg/drivers/dqlite/no_dqlite.go @@ -1,3 +1,4 @@ +//go:build !dqlite // +build !dqlite package dqlite diff --git a/pkg/drivers/jetstream/jetstream.go b/pkg/drivers/jetstream/jetstream.go new file mode 100644 index 00000000..96dd1789 --- /dev/null +++ b/pkg/drivers/jetstream/jetstream.go @@ -0,0 +1,1017 @@ +package jetstream + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/k3s-io/kine/pkg/drivers/jetstream/kv" + "github.com/k3s-io/kine/pkg/server" + "github.com/k3s-io/kine/pkg/tls" + "github.com/nats-io/jsm.go/natscontext" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +const ( + defaultBucket = "kine" + defaultRevHistory = 10 + defaultSlowMethod = 500 * time.Millisecond +) + +var ( + toplevelKeyMatch = regexp.MustCompile(`(/[^/]*/[^/]*)(/.*)?`) +) + +type Config struct { + natsURL string + options []nats.Option + revHistory uint8 + bucket string + slowMethod time.Duration +} + +type JetStream struct { + kvBucket nats.KeyValue + kvBucketMutex *sync.RWMutex + kvDirectoryMutex *sync.RWMutex + kvDirectoryMuxes map[string]*sync.RWMutex + jetStream nats.JetStreamContext + slowMethod time.Duration + server.Backend +} + +type JSValue struct { + KV *server.KeyValue + PrevRevision int64 + Create bool + Delete bool +} + +// New get the JetStream Backend, establish connection to NATS JetStream. At the moment nats.go does not have +// connection string support so kine will use: +// nats://(token|username:password)hostname:port?bucket=bucketName&contextFile=nats-context&slowMethod=&revHistory=`. +// +// If contextFile is provided then do not provide a hostname:port in the endpoint URL, instead use the context file to +// provide the NATS server url(s). +// +// bucket: specifies the bucket on the nats server for the k8s key/values for this cluster (optional) +// contextFile: specifies the nats context file to load e.g. /etc/nats/context.json +// revHistory: controls the rev history for JetStream defaults to 10 must be > 2 and <= 64 +// slowMethod: used to log methods slower than provided duration default 500ms +// +// Multiple urls can be passed in a comma separated format - only the first in the list will be evaluated for query +// parameters. While auth is valid in the url, the preferred way to pass auth is through a context file. If user/pass or +// token are provided in the url only the first one will be used for all urls. +/// +// If no bucket query parameter is provided it will default to kine +// +// https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts +// +// example nats-context.json: +/* +{ + "description": "optional context description", + "url": "nats://127.0.0.1:4222", + "token": "", + "user": "", + "password": "", + "creds": "", + "nkey": "", + "cert": "", + "key": "", + "ca": "", + "nsc": "", + "jetstream_domain": "", + "jetstream_api_prefix": "", + "jetstream_event_prefix": "" +} +*/ +func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Backend, error) { + config, err := parseNatsConnection(connection, tlsInfo) + if err != nil { + return nil, err + } + + logrus.Infof("using bucket: %s", config.bucket) + logrus.Infof("connecting to %s", config.natsURL) + + nopts := append(config.options, nats.Name("k3s-server using bucket: "+config.bucket)) + + conn, err := nats.Connect(config.natsURL, nopts...) + if err != nil { + return nil, err + } + + js, err := conn.JetStream() + + if err != nil { + return nil, err + } + + bucket, err := js.KeyValue(config.bucket) + if err != nil && err == nats.ErrBucketNotFound { + bucket, err = js.CreateKeyValue( + &nats.KeyValueConfig{ + Bucket: config.bucket, + Description: "Holds kine key/values", + History: config.revHistory, + }) + } + + kvB := kv.NewEncodedKV(bucket, &kv.EtcdKeyCodec{}, &kv.S2ValueCodec{}) + + if err != nil { + return nil, err + } + + return &JetStream{ + kvBucket: kvB, + kvBucketMutex: &sync.RWMutex{}, + kvDirectoryMutex: &sync.RWMutex{}, + kvDirectoryMuxes: make(map[string]*sync.RWMutex), + jetStream: js, + slowMethod: config.slowMethod, + }, nil +} + +// parseNatsConnection returns nats connection url, bucketName and []nats.Option, error +func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { + + jsConfig := &Config{ + slowMethod: defaultSlowMethod, + revHistory: defaultRevHistory, + } + connections := strings.Split(dsn, ",") + jsConfig.bucket = defaultBucket + + jsConfig.options = make([]nats.Option, 0) + + u, err := url.Parse(connections[0]) + if err != nil { + return nil, err + } + + queryMap, err := url.ParseQuery(u.RawQuery) + if err != nil { + return nil, err + } + + if b, ok := queryMap["bucket"]; ok { + jsConfig.bucket = b[0] + } + + if r, ok := queryMap["slowMethod"]; ok { + if dur, err := time.ParseDuration(r[0]); err == nil { + jsConfig.slowMethod = dur + } else { + return nil, err + } + } + + if r, ok := queryMap["revHistory"]; ok { + if revs, err := strconv.ParseUint(r[0], 10, 8); err == nil { + if revs >= 2 && revs <= 64 { + jsConfig.revHistory = uint8(revs) + } else { + return nil, fmt.Errorf("invalid revHistory, must be => 2 and <= 64") + } + } + } + + contextFile, hasContext := queryMap["contextFile"] + if hasContext && u.Host != "" { + return jsConfig, fmt.Errorf("when using context endpoint no host should be provided") + } + + if tlsInfo.KeyFile != "" && tlsInfo.CertFile != "" { + jsConfig.options = append(jsConfig.options, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile)) + } + + if tlsInfo.CAFile != "" { + jsConfig.options = append(jsConfig.options, nats.RootCAs(tlsInfo.CAFile)) + } + + if hasContext { + logrus.Infof("loading nats contextFile=%s", contextFile[0]) + + natsContext, err := natscontext.NewFromFile(contextFile[0]) + if err != nil { + return nil, err + } + + connections = strings.Split(natsContext.ServerURL(), ",") + + // command line options provided to kine will override the file + // https://github.com/nats-io/jsm.go/blob/v0.0.29/natscontext/context.go#L257 + // allows for user, creds, nke, token, certifcate, ca, inboxprefix from the context.json + natsClientOpts, err := natsContext.NATSOptions(jsConfig.options...) + if err != nil { + return nil, err + } + jsConfig.options = natsClientOpts + } + + connBuilder := strings.Builder{} + for idx, c := range connections { + if idx > 0 { + connBuilder.WriteString(",") + } + + u, err := url.Parse(c) + if err != nil { + return nil, err + } + + if u.Scheme != "nats" { + return nil, fmt.Errorf("invalid connection string=%s", c) + } + + connBuilder.WriteString("nats://") + + if u.User != nil && idx == 0 { + userInfo := strings.Split(u.User.String(), ":") + if len(userInfo) > 1 { + jsConfig.options = append(jsConfig.options, nats.UserInfo(userInfo[0], userInfo[1])) + } else { + jsConfig.options = append(jsConfig.options, nats.Token(userInfo[0])) + } + } + connBuilder.WriteString(u.Host) + } + jsConfig.natsURL = connBuilder.String() + + logrus.Infof("using config %v", jsConfig) + + return jsConfig, nil +} + +func (j *JetStream) Start(ctx context.Context) error { + // See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97 + if _, err := j.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil { + if err != server.ErrKeyExists { + logrus.Errorf("Failed to create health check key: %v", err) + } + } + return nil +} + +func (j *JetStream) isKeyExpired(_ context.Context, createTime time.Time, value *JSValue) bool { + + requestTime := time.Now() + expired := false + if value.KV.Lease > 0 { + if requestTime.After(createTime.Add(time.Second * time.Duration(value.KV.Lease))) { + expired = true + if err := j.kvBucket.Delete(value.KV.Key); err != nil { + logrus.Warnf("problem deleting expired key=%s, error=%v", value.KV.Key, err) + } + } + } + + return expired +} + +// Get returns the associated server.KeyValue +func (j *JetStream) Get(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) { + //logrus.Tracef("GET %s, rev=%d", key, revision) + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + size := 0 + if kvRet != nil { + size = len(kvRet.Value) + } + fStr := "GET %s, rev=%d => revRet=%d, kv=%v, size=%d, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.String()) + } else { + logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.String()) + } + }() + + currentRev, err := j.currentRevision() + if err != nil { + return currentRev, nil, err + } + + if rev, kv, err := j.get(ctx, key, revision, false); err == nil { + if kv == nil { + return currentRev, nil, nil + } + return rev, kv.KV, nil + } else if err == nats.ErrKeyNotFound { + return currentRev, nil, nil + } else { + return rev, nil, err + } +} + +func (j *JetStream) get(ctx context.Context, key string, revision int64, includeDeletes bool) (int64, *JSValue, error) { + + compactRev, err := j.compactRevision() + if err != nil { + return 0, nil, err + } + + // Get latest revision + if revision <= 0 { + if entry, err := j.kvBucket.Get(key); err == nil { + + val, err := decode(entry) + if err != nil { + return 0, nil, err + } + + if val.Delete && !includeDeletes { + return 0, nil, nats.ErrKeyNotFound + } + + if j.isKeyExpired(ctx, entry.Created(), &val) { + return 0, nil, nats.ErrKeyNotFound + } + return val.KV.ModRevision, &val, nil + } else if err == nats.ErrKeyNotFound { + return 0, nil, err + } else { + return 0, nil, err + } + } else { + if revision < compactRev { + logrus.Warnf("requested revision that has been compacted") + } + if entry, err := j.kvBucket.GetRevision(key, uint64(revision)); err == nil { + val, err := decode(entry) + if err != nil { + return 0, nil, err + } + + if val.Delete && !includeDeletes { + return 0, nil, nats.ErrKeyNotFound + } + + if j.isKeyExpired(ctx, entry.Created(), &val) { + return 0, nil, nats.ErrKeyNotFound + } + return val.KV.ModRevision, &val, nil + } else if err == nats.ErrKeyNotFound { + return 0, nil, err + } else { + return 0, nil, err + } + } +} + +// Create +func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) { + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + fStr := "CREATE %s, size=%d, lease=%d => rev=%d, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, key, len(value), lease, revRet, errRet, duration.String()) + } else { + logrus.Tracef(fStr, key, len(value), lease, revRet, errRet, duration.String()) + } + }() + + lockFolder := getTopLevelKey(key) + if lockFolder != "" { + j.kvDirectoryMutex.Lock() + if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { + j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} + } + j.kvDirectoryMutex.Unlock() + j.kvDirectoryMuxes[lockFolder].Lock() + defer j.kvDirectoryMuxes[lockFolder].Unlock() + } + + // check if key exists already + rev, prevKV, err := j.get(ctx, key, 0, true) + if err != nil && err != nats.ErrKeyNotFound { + return 0, err + } + + createValue := JSValue{ + Delete: false, + Create: true, + PrevRevision: rev, + KV: &server.KeyValue{ + Key: key, + CreateRevision: 0, + ModRevision: 0, + Value: value, + Lease: lease, + }, + } + + if prevKV != nil { + if !prevKV.Delete { + return 0, server.ErrKeyExists + } + createValue.PrevRevision = prevKV.KV.ModRevision + } + + event, err := encode(createValue) + if err != nil { + return 0, err + } + + if prevKV != nil { + seq, err := j.kvBucket.Put(key, event) + if err != nil { + return 0, err + } + return int64(seq), nil + } + seq, err := j.kvBucket.Create(key, event) + if err != nil { + return 0, err + } + return int64(seq), nil +} + +func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + fStr := "DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.String()) + } else { + logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.String()) + } + }() + lockFolder := getTopLevelKey(key) + if lockFolder != "" { + j.kvDirectoryMutex.Lock() + if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { + j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} + } + j.kvDirectoryMutex.Unlock() + j.kvDirectoryMuxes[lockFolder].Lock() + defer j.kvDirectoryMuxes[lockFolder].Unlock() + } + + rev, value, err := j.get(ctx, key, 0, true) + if err != nil { + if err == nats.ErrKeyNotFound { + return rev, nil, true, nil + } + return rev, nil, false, err + } + + if value == nil { + return rev, nil, true, nil + } + + if value.Delete { + return rev, value.KV, true, nil + } + + if revision != 0 && value.KV.ModRevision != revision { + return rev, value.KV, false, nil + } + + deleteEvent := JSValue{ + Delete: true, + PrevRevision: rev, + KV: value.KV, + } + deleteEventBytes, err := encode(deleteEvent) + if err != nil { + return rev, nil, false, err + } + + deleteRev, err := j.kvBucket.Put(key, deleteEventBytes) + if err != nil { + return rev, value.KV, false, nil + } + + err = j.kvBucket.Delete(key) + if err != nil { + return rev, value.KV, false, nil + } + + return int64(deleteRev), value.KV, true, nil +} + +func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) { + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + fStr := "LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.String()) + } else { + logrus.Tracef(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.String()) + } + }() + + // its assumed that when there is a start key that that key exists. + if strings.HasSuffix(prefix, "/") { + if prefix == startKey || strings.HasPrefix(prefix, startKey) { + startKey = "" + } + } + + rev, err := j.currentRevision() + if err != nil { + return 0, nil, err + } + + kvs := make([]*server.KeyValue, 0) + var count int64 = 0 + + // startkey provided so get max revision after the startKey matching the prefix + if startKey != "" { + histories := make(map[string][]nats.KeyValueEntry) + var minRev int64 = 0 + //var innerEntry nats.KeyValueEntry + if entries, err := j.kvBucket.History(startKey, nats.Context(ctx)); err == nil { + histories[startKey] = entries + for i := len(entries) - 1; i >= 0; i-- { + // find the matching startKey + if int64(entries[i].Revision()) <= revision { + minRev = int64(entries[i].Revision()) + logrus.Debugf("Found min revision=%d for key=%s", minRev, startKey) + break + } + } + } else { + return 0, nil, err + } + + keys, err := j.getKeys(ctx, prefix, true) + if err != nil { + return 0, nil, err + } + + for _, key := range keys { + if key != startKey { + if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { + histories[key] = history + } else { + // should not happen + logrus.Warnf("no history for %s", key) + } + } + } + var nextRevID = minRev + var nextRevision nats.KeyValueEntry + for k, v := range histories { + logrus.Debugf("Checking %s history", k) + for i := len(v) - 1; i >= 0; i-- { + if int64(v[i].Revision()) > nextRevID && int64(v[i].Revision()) <= revision { + nextRevID = int64(v[i].Revision()) + nextRevision = v[i] + logrus.Debugf("found next rev=%d", nextRevID) + break + } else if int64(v[i].Revision()) <= nextRevID { + break + } + } + } + if nextRevision != nil { + entry, err := decode(nextRevision) + if err != nil { + return 0, nil, err + } + kvs = append(kvs, entry.KV) + } + + return rev, kvs, nil + } + + current := true + + if revision != 0 { + rev = revision + current = false + } + + if current { + + entries, err := j.getKeyValues(ctx, prefix, true) + if err != nil { + return 0, nil, err + } + for _, e := range entries { + if count < limit || limit == 0 { + kv, err := decode(e) + if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil { + kvs = append(kvs, kv.KV) + count++ + } + } else { + break + } + } + + } else { + keys, err := j.getKeys(ctx, prefix, true) + if err != nil { + return 0, nil, err + } + if revision == 0 && len(keys) == 0 { + return rev, nil, nil + } + + for _, key := range keys { + if count < limit || limit == 0 { + if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { + for i := len(history) - 1; i >= 0; i-- { + if int64(history[i].Revision()) <= revision { + if entry, err := decode(history[i]); err == nil { + kvs = append(kvs, entry.KV) + count++ + } else { + logrus.Warnf("Could not decode %s rev=> %d", key, history[i].Revision()) + } + break + } + } + } else { + // should not happen + logrus.Warnf("no history for %s", key) + } + } + } + + } + return rev, kvs, nil +} + +func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64) (revRet int64, eventRet []*server.Event, errRet error) { + + entries, err := j.getKeyValues(ctx, prefix, false) + + if err != nil { + return 0, nil, err + } + + rev, err := j.currentRevision() + if err != nil { + return 0, nil, err + } + if revision != 0 { + rev = revision + } + events := make([]*server.Event, 0) + for _, e := range entries { + kv, err := decode(e) + if err == nil && int64(e.Revision()) > revision { + event := server.Event{ + Delete: kv.Delete, + Create: kv.Create, + KV: kv.KV, + PrevKV: &server.KeyValue{}, + } + if _, prevKV, err := j.Get(ctx, kv.KV.Key, kv.PrevRevision); err == nil && prevKV != nil { + event.PrevKV = prevKV + } + + events = append(events, &event) + } + } + return rev, events, nil +} + +// Count returns an exact count of the number of matching keys and the current revision of the database +func (j *JetStream) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) { + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + fStr := "COUNT %s => rev=%d, count=%d, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, prefix, revRet, count, err, duration.String()) + } else { + logrus.Tracef(fStr, prefix, revRet, count, err, duration.String()) + } + }() + + entries, err := j.getKeys(ctx, prefix, false) + if err != nil { + return 0, 0, err + } + // current revision + currentRev, err := j.currentRevision() + if err != nil { + return 0, 0, err + } + return currentRev, int64(len(entries)), nil +} + +func (j *JetStream) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) { + start := time.Now() + defer func() { + duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + kvRev := int64(0) + if kvRet != nil { + kvRev = kvRet.ModRevision + } + fStr := "UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v, duration=%s" + if duration > j.slowMethod { + logrus.Warnf(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.String()) + } else { + logrus.Tracef(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.String()) + } + }() + + lockFolder := getTopLevelKey(key) + if lockFolder != "" { + j.kvDirectoryMutex.Lock() + if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { + j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} + } + j.kvDirectoryMutex.Unlock() + j.kvDirectoryMuxes[lockFolder].Lock() + defer j.kvDirectoryMuxes[lockFolder].Unlock() + } + + rev, prevKV, err := j.get(ctx, key, 0, false) + + if err != nil { + if err == nats.ErrKeyNotFound { + return rev, nil, false, nil + } + return rev, nil, false, err + } + + if prevKV == nil { + return 0, nil, false, nil + } + + if prevKV.KV.ModRevision != revision { + return rev, prevKV.KV, false, nil + } + + updateValue := JSValue{ + Delete: false, + Create: false, + PrevRevision: prevKV.KV.ModRevision, + KV: &server.KeyValue{ + Key: key, + CreateRevision: prevKV.KV.CreateRevision, + Value: value, + Lease: lease, + }, + } + if prevKV.KV.CreateRevision == 0 { + updateValue.KV.CreateRevision = rev + } + + valueBytes, err := encode(updateValue) + if err != nil { + return 0, nil, false, err + } + + seq, err := j.kvBucket.Put(key, valueBytes) + if err != nil { + return 0, nil, false, err + } + + updateValue.KV.ModRevision = int64(seq) + + return int64(seq), updateValue.KV, true, err + +} + +func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { + + watcher, err := j.kvBucket.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) + + if revision > 0 { + revision-- + } + _, events, err := j.listAfter(ctx, prefix, revision) + + if err != nil { + logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) + } + + result := make(chan []*server.Event, 100) + + go func() { + + if len(events) > 0 { + result <- events + revision = events[len(events)-1].KV.ModRevision + } + + for { + select { + case i := <-watcher.Updates(): + if i != nil { + if int64(i.Revision()) > revision { + events := make([]*server.Event, 1) + var err error + value := JSValue{ + KV: &server.KeyValue{}, + PrevRevision: 0, + Create: false, + Delete: false, + } + prevValue := JSValue{ + KV: &server.KeyValue{}, + PrevRevision: 0, + Create: false, + Delete: false, + } + lastEntry := &i + + value, err = decode(*lastEntry) + if err != nil { + logrus.Warnf("watch event: could not decode %s seq %d", i.Key(), i.Revision()) + } + if _, prevEntry, prevErr := j.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil { + if prevEntry != nil { + prevValue = *prevEntry + } + } + if err == nil { + event := &server.Event{ + Create: value.Create, + Delete: value.Delete, + KV: value.KV, + PrevKV: prevValue.KV, + } + events[0] = event + result <- events + } else { + logrus.Warnf("error decoding %s event %v", i.Key(), err) + continue + } + } + } + case <-ctx.Done(): + logrus.Infof("watcher: %s context cancelled", prefix) + if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { + logrus.Warnf("error stopping %s watcher: %v", prefix, err) + } + return + } + } + }() + return result +} + +// getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut +// operation. If it is not a KeyValuePut then it will return nil. +func (j *JetStream) getPreviousEntry(ctx context.Context, entry nats.KeyValueEntry) (result *nats.KeyValueEntry, e error) { + defer func() { + if result != nil { + logrus.Debugf("getPreviousEntry %s:%d found=true %d", entry.Key(), entry.Revision(), (*result).Revision()) + } else { + logrus.Debugf("getPreviousEntry %s:%d found=false", entry.Key(), entry.Revision()) + } + }() + found := false + entries, err := j.kvBucket.History(entry.Key(), nats.Context(ctx)) + if err == nil { + for idx := len(entries) - 1; idx >= 0; idx-- { + if found { + if entries[idx].Operation() == nats.KeyValuePut { + return &entries[idx], nil + } + return nil, nil + } + if entries[idx].Revision() == entry.Revision() { + found = true + } + } + } + + return nil, nil +} + +// DbSize get the kineBucket size from JetStream. +func (j *JetStream) DbSize(ctx context.Context) (int64, error) { + keySize, err := j.bucketSize(ctx, j.kvBucket.Bucket()) + if err != nil { + return -1, err + } + return keySize, nil +} + +func (j *JetStream) bucketSize(ctx context.Context, bucket string) (int64, error) { + os, err := j.jetStream.ObjectStore(bucket) + if err != nil { + return -1, err + } + s, err := os.Status() + if err != nil { + return -1, err + } + return int64(s.Size()), nil +} + +func encode(v JSValue) ([]byte, error) { + buf, err := json.Marshal(v) + return buf, err +} + +func decode(e nats.KeyValueEntry) (JSValue, error) { + v := JSValue{} + if e.Value() != nil { + err := json.Unmarshal(e.Value(), &v) + if err != nil { + logrus.Debugf("key: %s", e.Key()) + logrus.Debugf("sequence number: %d", e.Revision()) + logrus.Debugf("bytes returned: %v", len(e.Value())) + return v, err + } + v.KV.ModRevision = int64(e.Revision()) + } + return v, nil +} + +func (j *JetStream) currentRevision() (int64, error) { + status, err := j.kvBucket.Status() + if err != nil { + return 0, err + } + return int64(status.(*nats.KeyValueBucketStatus).StreamInfo().State.LastSeq), nil +} + +func (j *JetStream) compactRevision() (int64, error) { + status, err := j.kvBucket.Status() + if err != nil { + return 0, err + } + return int64(status.(*nats.KeyValueBucketStatus).StreamInfo().State.FirstSeq), nil +} + +// getKeyValues returns a []nats.KeyValueEntry matching prefix +func (j *JetStream) getKeyValues(ctx context.Context, prefix string, sortResults bool) ([]nats.KeyValueEntry, error) { + watcher, err := j.kvBucket.Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) + if err != nil { + return nil, err + } + defer func() { + err := watcher.Stop() + if err != nil { + logrus.Warnf("failed to stop %s getKeyValues watcher", prefix) + } + }() + + var entries []nats.KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break + } + entries = append(entries, entry) + } + + if sortResults { + sort.Slice(entries, func(i, j int) bool { + return entries[i].Key() < entries[j].Key() + }) + } + + return entries, nil +} + +// getKeys returns a list of keys matching a prefix +func (j *JetStream) getKeys(ctx context.Context, prefix string, sortResults bool) ([]string, error) { + watcher, err := j.kvBucket.Watch(prefix, nats.MetaOnly(), nats.IgnoreDeletes(), nats.Context(ctx)) + if err != nil { + return nil, err + } + defer func() { + err := watcher.Stop() + if err != nil { + logrus.Warnf("failed to stop %s getKeys watcher", prefix) + } + }() + + var keys []string + // grab all matching keys immediately + for entry := range watcher.Updates() { + if entry == nil { + break + } + keys = append(keys, entry.Key()) + } + + if sortResults { + sort.Strings(keys) + } + + return keys, nil +} + +func getTopLevelKey(key string) string { + if toplevelKeyMatch.MatchString(key) { + matches := toplevelKeyMatch.FindStringSubmatch(key) + return matches[1] + } + return "" +} diff --git a/pkg/drivers/jetstream/kv/etcd_encoder.go b/pkg/drivers/jetstream/kv/etcd_encoder.go new file mode 100644 index 00000000..b0484968 --- /dev/null +++ b/pkg/drivers/jetstream/kv/etcd_encoder.go @@ -0,0 +1,104 @@ +package kv + +import ( + "fmt" + "io" + "io/ioutil" + "strings" + + "github.com/klauspost/compress/s2" + "github.com/nats-io/nats.go" + "github.com/shengdoushi/base58" +) + +// EtcdKeyCodec turns keys like /this/is/a.test.key into Base58 encoded values split on `/` +// This is because NATS Jetstream Keys are split on . rather than / +type EtcdKeyCodec struct{} + +type S2ValueCodec struct{} + +type PlainCodec struct{} + +var ( + keyAlphabet = base58.BitcoinAlphabet +) + +func (e *EtcdKeyCodec) EncodeRange(keys string) (string, error) { + ek, err := e.Encode(keys) + if err != nil { + return "", err + } + if strings.HasSuffix(ek, ".") { + return fmt.Sprintf("%s>", ek), nil + } + return ek, nil +} + +func (*EtcdKeyCodec) Encode(key string) (retKey string, e error) { + //defer func() { + // logrus.Debugf("encoded %s => %s", key, retKey) + //}() + parts := []string{} + for _, part := range strings.Split(strings.TrimPrefix(key, "/"), "/") { + if part == ">" || part == "*" { + parts = append(parts, part) + continue + } + parts = append(parts, base58.Encode([]byte(part), keyAlphabet)) + } + + if len(parts) == 0 { + return "", nats.ErrInvalidKey + } + + return strings.Join(parts, "."), nil +} + +func (*EtcdKeyCodec) Decode(key string) (retKey string, e error) { + //defer func() { + // logrus.Debugf("decoded %s => %s", key, retKey) + //}() + parts := []string{} + for _, s := range strings.Split(key, ".") { + decodedPart, err := base58.Decode(s, keyAlphabet) + if err != nil { + return "", err + } + parts = append(parts, string(decodedPart[:])) + } + if len(parts) == 0 { + return "", nats.ErrInvalidKey + } + return fmt.Sprintf("/%s", strings.Join(parts, "/")), nil +} + +func (*S2ValueCodec) Encode(src []byte, dst io.Writer) error { + enc := s2.NewWriter(dst) + err := enc.EncodeBuffer(src) + if err != nil { + enc.Close() + return err + } + return enc.Close() +} + +func (*S2ValueCodec) Decode(src io.Reader, dst io.Writer) error { + dec := s2.NewReader(src) + _, err := io.Copy(dst, dec) + return err +} + +func (*PlainCodec) Encode(src []byte, dst io.Writer) error { + _, err := dst.Write(src) + return err +} + +func (*PlainCodec) Decode(src io.Reader, dst io.Writer) error { + b, err := ioutil.ReadAll(src) + if err != nil { + return err + } + _, err = dst.Write(b) + + return err +} diff --git a/pkg/drivers/jetstream/kv/kv.go b/pkg/drivers/jetstream/kv/kv.go new file mode 100644 index 00000000..4031f1a1 --- /dev/null +++ b/pkg/drivers/jetstream/kv/kv.go @@ -0,0 +1,293 @@ +package kv + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +func NewEncodedKV(bucket nats.KeyValue, k KeyCodec, v ValueCodec) nats.KeyValue { + return &EncodedKV{bucket: bucket, keyCodec: k, valueCodec: v} +} + +type WatcherWithCtx interface { + WatchWithCtx(ctx context.Context, keys string, opts ...nats.WatchOpt) nats.KeyWatcher +} + +type KeyCodec interface { + Encode(key string) (string, error) + Decode(key string) (string, error) + EncodeRange(keys string) (string, error) +} + +type ValueCodec interface { + Encode(src []byte, dst io.Writer) error + Decode(src io.Reader, dst io.Writer) error +} + +type EncodedKV struct { + WatcherWithCtx + bucket nats.KeyValue + keyCodec KeyCodec + valueCodec ValueCodec +} + +type watcher struct { + watcher nats.KeyWatcher + keyCodec KeyCodec + valueCodec ValueCodec + updates chan nats.KeyValueEntry + ctx context.Context + cancel context.CancelFunc +} + +func (w *watcher) Context() context.Context { + if w == nil { + return nil + } + return w.ctx +} + +type entry struct { + keyCodec KeyCodec + valueCodec ValueCodec + entry nats.KeyValueEntry +} + +func (e *entry) Key() string { + dk, err := e.keyCodec.Decode(e.entry.Key()) + // should not happen + if err != nil { + // should not happen + logrus.Warnf("could not decode key %s: %v", e.entry.Key(), err) + return "" + } + + return dk +} + +func (e *entry) Bucket() string { return e.entry.Bucket() } +func (e *entry) Value() []byte { + buf := new(bytes.Buffer) + if err := e.valueCodec.Decode(bytes.NewBuffer(e.entry.Value()), buf); err != nil { + // should not happen + logrus.Warnf("could not decode value for %s: %v", e.Key(), err) + } + return buf.Bytes() +} +func (e *entry) Revision() uint64 { return e.entry.Revision() } +func (e *entry) Created() time.Time { return e.entry.Created() } +func (e *entry) Delta() uint64 { return e.entry.Delta() } +func (e *entry) Operation() nats.KeyValueOp { return e.entry.Operation() } + +func (w *watcher) Updates() <-chan nats.KeyValueEntry { return w.updates } +func (w *watcher) Stop() error { + if w.cancel != nil { + w.cancel() + } + + return w.watcher.Stop() +} + +func (e *EncodedKV) newWatcher(w nats.KeyWatcher) nats.KeyWatcher { + watch := &watcher{ + watcher: w, + keyCodec: e.keyCodec, + valueCodec: e.valueCodec, + updates: make(chan nats.KeyValueEntry, 32)} + + if w.Context() == nil { + watch.ctx, watch.cancel = context.WithCancel(context.Background()) + } else { + watch.ctx, watch.cancel = context.WithCancel(w.Context()) + } + + go func() { + for { + select { + case ent := <-w.Updates(): + if ent == nil { + watch.updates <- nil + continue + } + + watch.updates <- &entry{ + keyCodec: e.keyCodec, + valueCodec: e.valueCodec, + entry: ent, + } + case <-watch.ctx.Done(): + return + } + } + }() + + return watch +} + +func (e *EncodedKV) Get(key string) (nats.KeyValueEntry, error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return nil, err + } + + ent, err := e.bucket.Get(ek) + if err != nil { + return nil, err + } + + return &entry{ + keyCodec: e.keyCodec, + valueCodec: e.valueCodec, + entry: ent, + }, nil +} + +func (e *EncodedKV) GetRevision(key string, revision uint64) (nats.KeyValueEntry, error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return nil, err + } + + ent, err := e.bucket.GetRevision(ek, revision) + if err != nil { + return nil, err + } + + return &entry{ + keyCodec: e.keyCodec, + valueCodec: e.valueCodec, + entry: ent, + }, nil +} + +func (e *EncodedKV) Put(key string, value []byte) (revision uint64, err error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return 0, err + } + + buf := new(bytes.Buffer) + + err = e.valueCodec.Encode(value, buf) + if err != nil { + return 0, err + } + + return e.bucket.Put(ek, buf.Bytes()) +} + +func (e *EncodedKV) Create(key string, value []byte) (revision uint64, err error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return 0, err + } + + buf := new(bytes.Buffer) + + err = e.valueCodec.Encode(value, buf) + if err != nil { + return 0, err + } + + return e.bucket.Create(ek, buf.Bytes()) +} + +func (e *EncodedKV) Update(key string, value []byte, last uint64) (revision uint64, err error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return 0, err + } + + buf := new(bytes.Buffer) + + err = e.valueCodec.Encode(value, buf) + if err != nil { + return 0, err + } + + return e.bucket.Update(ek, buf.Bytes(), last) +} + +func (e *EncodedKV) Delete(key string) error { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return err + } + + return e.bucket.Delete(ek) +} + +func (e *EncodedKV) Purge(key string) error { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return err + } + + return e.bucket.Purge(ek) +} + +func (e *EncodedKV) Watch(keys string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) { + ek, err := e.keyCodec.EncodeRange(keys) + if err != nil { + return nil, err + } + + nw, err := e.bucket.Watch(ek, opts...) + if err != nil { + return nil, err + } + + return e.newWatcher(nw), err +} + +func (e *EncodedKV) History(key string, opts ...nats.WatchOpt) ([]nats.KeyValueEntry, error) { + ek, err := e.keyCodec.Encode(key) + if err != nil { + return nil, err + } + + var res []nats.KeyValueEntry + hist, err := e.bucket.History(ek, opts...) + if err != nil { + return nil, err + } + + for _, ent := range hist { + res = append(res, &entry{e.keyCodec, e.valueCodec, ent}) + } + + return res, nil +} + +func (e *EncodedKV) PutString(key string, value string) (revision uint64, err error) { + return e.Put(key, []byte(value)) +} +func (e *EncodedKV) WatchAll(opts ...nats.WatchOpt) (nats.KeyWatcher, error) { + return e.bucket.WatchAll(opts...) +} +func (e *EncodedKV) Keys(opts ...nats.WatchOpt) ([]string, error) { + keys, err := e.bucket.Keys(opts...) + if err != nil { + return nil, err + } + var res []string + for _, key := range keys { + dk, err := e.keyCodec.Decode(key) + if err != nil { + // should not happen + logrus.Warnf("error decoding %s: %v", key, err) + } + res = append(res, dk) + } + + return res, nil +} + +func (e *EncodedKV) Bucket() string { return e.bucket.Bucket() } +func (e *EncodedKV) PurgeDeletes(opts ...nats.PurgeOpt) error { return e.bucket.PurgeDeletes(opts...) } +func (e *EncodedKV) Status() (nats.KeyValueStatus, error) { return e.bucket.Status() } diff --git a/pkg/drivers/sqlite/sqlite.go b/pkg/drivers/sqlite/sqlite.go index 6061e44f..4191f4be 100644 --- a/pkg/drivers/sqlite/sqlite.go +++ b/pkg/drivers/sqlite/sqlite.go @@ -1,3 +1,4 @@ +//go:build cgo // +build cgo package sqlite diff --git a/pkg/drivers/sqlite/sqlite_nocgo.go b/pkg/drivers/sqlite/sqlite_nocgo.go index 951319fa..31062cdd 100644 --- a/pkg/drivers/sqlite/sqlite_nocgo.go +++ b/pkg/drivers/sqlite/sqlite_nocgo.go @@ -1,3 +1,4 @@ +//go:build !cgo // +build !cgo package sqlite diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index f9af22da..bee1cf86 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -9,6 +9,7 @@ import ( "github.com/k3s-io/kine/pkg/drivers/dqlite" "github.com/k3s-io/kine/pkg/drivers/generic" + "github.com/k3s-io/kine/pkg/drivers/jetstream" "github.com/k3s-io/kine/pkg/drivers/mysql" "github.com/k3s-io/kine/pkg/drivers/pgsql" "github.com/k3s-io/kine/pkg/drivers/sqlite" @@ -24,12 +25,13 @@ import ( ) const ( - KineSocket = "unix://kine.sock" - SQLiteBackend = "sqlite" - DQLiteBackend = "dqlite" - ETCDBackend = "etcd3" - MySQLBackend = "mysql" - PostgresBackend = "postgres" + KineSocket = "unix://kine.sock" + SQLiteBackend = "sqlite" + DQLiteBackend = "dqlite" + ETCDBackend = "etcd3" + JetStreamBackend = "jetstream" + MySQLBackend = "mysql" + PostgresBackend = "postgres" ) type Config struct { @@ -233,6 +235,8 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) case MySQLBackend: backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) + case JetStreamBackend: + backend, err = jetstream.New(ctx, dsn, cfg.BackendTLSConfig) default: return false, nil, fmt.Errorf("storage backend is not defined") } @@ -246,6 +250,8 @@ func ParseStorageEndpoint(storageEndpoint string) (string, string) { switch network { case "": return SQLiteBackend, "" + case "nats": + return JetStreamBackend, storageEndpoint case "http": fallthrough case "https": diff --git a/scripts/build b/scripts/build index 0e7afdb0..b77134d8 100755 --- a/scripts/build +++ b/scripts/build @@ -17,4 +17,5 @@ CGO_CFLAGS="-DSQLITE_ENABLE_DBSTAT_VTAB=1 -DSQLITE_USE_ALLOCA=1" go build -ldfla if [ "$CROSS" = "true" ] && [ "$ARCH" = "amd64" ]; then GOOS=darwin go build -ldflags "$LINKFLAGS" -o bin/kine-darwin GOOS=windows go build -ldflags "$LINKFLAGS" -o bin/kine-windows + GOOS=linux go build -ldflags "$LINKFLAGS" -o bin/kine-amd64 fi diff --git a/scripts/test b/scripts/test index dd1deacc..f1c0a4de 100755 --- a/scripts/test +++ b/scripts/test @@ -26,4 +26,7 @@ echo "Did test-run-postgres $?" . ./scripts/test-run-cockroachdb echo "Did test-run-cockroachdb $?" +. ./scripts/test-run-jetstream +echo "Did test-jetstream $?" + exit 0 diff --git a/scripts/test-helpers b/scripts/test-helpers index 66f278ff..a23087cb 100755 --- a/scripts/test-helpers +++ b/scripts/test-helpers @@ -13,8 +13,8 @@ dump-logs() { local name=$(cat $node/metadata/name 2>/dev/null) [ "$name" ] || continue mkdir -p $node/logs - local hostname=$(docker exec $name hostname) - docker logs $name >$node/logs/system.log 2>&1 + local hostname=$(docker container exec $name hostname) + docker container logs $name >$node/logs/system.log 2>&1 for log in $node/logs/*.log; do echo echo "#- Tail: $log" @@ -44,7 +44,7 @@ test-cleanup() { [ -f "$name" ] || continue local container=$(cat $name) echo "Removing container $container" - docker rm -f -v $container + docker container rm -f -v $container done echo if has-function test-post-hook; then @@ -159,7 +159,12 @@ fetch-kubeconfig() {( local num=${1:-1} local name=$(cat $TEST_DIR/servers/$num/metadata/name) local url=$(cat $TEST_DIR/servers/$num/metadata/url) - docker cp $name:/etc/rancher/k3s/k3s.yaml - 2>/dev/null | tar -xO 2>/dev/null | sed -e "s|https://127.0.0.1:6443|$url|g" >$TEST_DIR/servers/$num/kubeconfig.yaml + # check for macos and do not replace k8s api url + if [[ "$OSTYPE" =~ ^darwin ]]; then + docker cp $name:/etc/rancher/k3s/k3s.yaml - 2>/dev/null | tar -xO 2>/dev/null >$TEST_DIR/servers/$num/kubeconfig.yaml + else + docker cp $name:/etc/rancher/k3s/k3s.yaml - 2>/dev/null | tar -xO 2>/dev/null | sed -e "s|https://127.0.0.1:6443|$url|g" >$TEST_DIR/servers/$num/kubeconfig.yaml + fi )} export -f fetch-kubeconfig @@ -195,18 +200,18 @@ provision-server() { run-function server-pre-hook $count - docker run \ + docker container run \ -d --name $name \ --privileged \ - -p 6443 \ + -p 6443:6443 \ -e K3S_DEBUG=true \ -e K3S_DATASTORE_ENDPOINT=$K3S_DATASTORE_ENDPOINT \ ${K3S_IMAGE:-docker.io/rancher/k3s:v1.20.4-k3s1} server \ --disable=coredns,servicelb,traefik,local-storage,metrics-server \ --disable-agent --disable-scheduler --disable-cloud-controller --disable-kube-proxy --disable-network-policy - local ip=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' $name | tee $TEST_DIR/servers/$count/metadata/ip) - local port=$(docker inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/servers/$count/metadata/port) + local ip=$(docker container inspect --format '{{ .NetworkSettings.IPAddress }}' $name | tee $TEST_DIR/servers/$count/metadata/ip) + local port=$(docker container inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/servers/$count/metadata/port) local url=$(echo "https://$ip:$port" | tee $TEST_DIR/servers/$count/metadata/url) echo "Started $name @ $url" @@ -224,16 +229,16 @@ provision-kine() { local count=$(inc-count kine) local testID=$(basename $TEST_DIR) - local name=$(echo "kine-$count-$testID" | tee $TEST_DIR/kine/$count/metadata/name) + local name=$(echo "kine-$count-$testID" | tee $TEST_DIR/kine/$count/metadata/name) run-function kine-pre-hook $count - docker run \ + docker container run \ -d --name $name \ $KINE_IMAGE --endpoint $KINE_ENDPOINT - local ip=$(docker inspect --format '{{.NetworkSettings.IPAddress}}' $name | tee $TEST_DIR/kine/$count/metadata/ip) - local port=$(docker inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/kine/$count/metadata/port) + local ip=$(docker container inspect --format '{{.NetworkSettings.IPAddress}}' $name | tee $TEST_DIR/kine/$count/metadata/ip) + local port=$(docker container inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/kine/$count/metadata/port) local url=$(echo "http://$ip:$port" | tee $TEST_DIR/kine/$count/metadata/url) run-function kine-post-hook $count @@ -253,7 +258,7 @@ provision-database() { echo > $TEST_DIR/databases/$count/metadata/env echo $DB_IMAGE > $TEST_DIR/databases/$count/metadata/image local testID=$(basename $TEST_DIR) - local name=$(echo "database-$count-$testID" | tee $TEST_DIR/databases/$count/metadata/name) + local name=$(echo "database-$count-$testID" | tee $TEST_DIR/databases/$count/metadata/name) local pass=$(echo "$RANDOM$RANDOM$RANDOM" | tee $TEST_DIR/databases/$count/metadata/password) while [[ "$#" -gt "0" ]]; do echo $1 >> $TEST_DIR/databases/$count/metadata/env @@ -262,15 +267,15 @@ provision-database() { run-function database-pre-hook $count - docker run \ + docker container run \ -d --name $name \ --cap-add=sys_nice \ -e $DB_PASSWORD_ENV=$pass \ --env-file $TEST_DIR/databases/$count/metadata/env \ ${DB_IMAGE} ${DB_ARGS} - local ip=$(docker inspect --format '{{.NetworkSettings.IPAddress}}' $name | tee $TEST_DIR/databases/$count/metadata/ip) - local port=$(docker inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/databases/$count/metadata/port) + local ip=$(docker container inspect --format '{{.NetworkSettings.IPAddress}}' $name | tee $TEST_DIR/databases/$count/metadata/ip) + local port=$(docker container inspect --format '{{range $k, $v := .NetworkSettings.Ports}}{{printf "%s\n" $k}}{{end}}' $name | head -n 1 | cut -d/ -f1 | tee $TEST_DIR/databases/$count/metadata/port) echo "Started $name @ $ip:$port" diff --git a/scripts/test-load b/scripts/test-load index 6ad2e0a7..3c35af8d 100755 --- a/scripts/test-load +++ b/scripts/test-load @@ -2,12 +2,16 @@ set -e cd $(dirname $0)/.. +if [[ "$LABEL" = "" ]]; then + LABEL=etcd +fi + test-load() { for i in {1..4}; do python3 hack/loadmap.py & done wait - python3 hack/histogram.py | awk '{print "[PERF]\t" $0}' + python3 hack/histogram.py --backend-name "$LABEL" | awk '{print "[PERF]\t" $0}' } echo "Running configmap load generation script" diff --git a/scripts/test-run-jetstream b/scripts/test-run-jetstream new file mode 100755 index 00000000..83117657 --- /dev/null +++ b/scripts/test-run-jetstream @@ -0,0 +1,22 @@ +#!/bin/bash + +start-test() { + local ip=$(cat $TEST_DIR/databases/*/metadata/ip) + local port=$(cat $TEST_DIR/databases/*/metadata/port) + local pass=$(cat $TEST_DIR/databases/*/metadata/password) + local image=$(cat $TEST_DIR/databases/*/metadata/image) + DB_CONNECTION_TEST="docker container run --rm --name connection-test --entrypoint /usr/local/bin/nats natsio/nats-box:0.8.1 server check connection --server=nats://$ip:$port" \ + timeout --foreground 1m bash -c "wait-for-db-connection" + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port" provision-kine + local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) + K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster +} +export -f start-test + +VERSION_LIST="\ + nats 2.7.4" + +while read ENGINE VERSION; do + LABEL=$ENGINE-$VERSION DB_PASSWORD_ENV=NATS_JS_PASSWORD DB_ARGS="-js" DB_IMAGE=docker.io/library/$ENGINE:$VERSION run-test +done <<< $VERSION_LIST +