Skip to content

Commit

Permalink
Fix kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
huuhait committed May 30, 2022
1 parent 45b0870 commit 7c89f49
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 105 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.16-alpine AS builder
FROM golang:1.18.1-alpine AS builder

RUN apk add --no-cache curl
RUN apk add --no-cache curl git

ARG KAIGARA_VERSION=0.1.29
# Install Kaigara
Expand Down
42 changes: 22 additions & 20 deletions cmd/rango/rango.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/zsmartex/pkg/wrap/kafka"

"github.com/zsmartex/pkg/services"
"github.com/zsmartex/rango/config"
"github.com/zsmartex/rango/pkg/auth"
"github.com/zsmartex/rango/pkg/metrics"
Expand Down Expand Up @@ -160,31 +161,32 @@ func main() {
return
}

config.InitializeConfig()

go func() {
// consumer, _ := config.Kafka.CreateConsumer([]string{*exName})

// for {
// messages, err := consumer.Consume(context.Background())
// if err != nil {
// config.Logger.Printf("Consumer error: %v (%v)\n", err, messages)
// }
kafka_brokers := strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
consumer, err := services.NewKafkaConsumer(kafka_brokers, fmt.Sprintf("rango-%s", uuid.NewString()), []string{*exName})
if err != nil {
log.Error().Msgf("Failed to create consumer: %s", err.Error())
return
}

// for _, message := range messages {
// hub.ReceiveMsg(message)
log.Info().Msg("Starting rango...")

// message.Session.MarkMessage(message.SamMsg, "")
// }
// }
go func() {
for {
records, err := consumer.Poll()
if err != nil {
config.Logger.Fatalf("Failed to poll consumer %v", err)
}

config.Kafka.Subscribe([]string{*exName}, func(msg kafka.Message) error {
hub.ReceiveMsg(msg)
for _, r := range records {
hub.ReceiveMsg(r)

return nil
})
consumer.CommitRecords(*r)
}
}
}()

defer consumer.Client.Close()

go hub.ListenWebsocketEvents()

wsHandler := func(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
)

var Logger *logrus.Entry
var Kafka *services.KafkaClient

func InitializeConfig() {
Logger = services.NewLoggerService("RANGO")
Kafka = services.NewKafka(Logger)
}
84 changes: 82 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,93 @@
module github.com/zsmartex/rango

go 1.14
go 1.18

require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
github.com/prometheus/client_golang v1.6.0
github.com/rs/zerolog v1.18.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/zsmartex/pkg v1.2.49
github.com/twmb/franz-go v1.4.2
github.com/zsmartex/pkg v1.3.59
)

require (
github.com/armon/go-metrics v0.3.9 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v0.16.2 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-retryablehttp v0.6.6 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/api v1.4.1 // indirect
github.com/hashicorp/vault/sdk v0.4.1 // indirect
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.9.1 // indirect
github.com/jackc/pgx/v4 v4.14.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/oklog/run v1.0.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/procfs v0.0.11 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/twmb/franz-go/pkg/kadm v0.0.0-20220319065723-845bc50e6da0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.0.0 // indirect
github.com/twmb/go-rbtree v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.45.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gorm.io/driver/postgres v1.3.1 // indirect
gorm.io/gorm v1.23.3 // indirect
)

0 comments on commit 7c89f49

Please sign in to comment.