Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

realtime consume data from kafka to clickhouse #196

Open
starking8b opened this issue Mar 21, 2024 · 0 comments
Open

realtime consume data from kafka to clickhouse #196

starking8b opened this issue Mar 21, 2024 · 0 comments

Comments

@starking8b
Copy link

starking8b commented Mar 21, 2024

I have many ipfix records which inserted in the kafka and i have created consumer via go language with this code

package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"flag"
	"fmt"
	"log"
//	"os"
//	"strconv"
	"sync"
	"time"
       "github.com/ClickHouse/clickhouse-go"
	"github.com/segmentio/kafka-go"
	cluster "github.com/bsm/sarama-cluster"
)

type options struct {
	Broker  string
	Topic   string
	Debug   bool
	Workers int
}

type dataField struct {
	I int
	V interface{}
}
type Header struct {
    Version     int 
    Length      int 
    ExportTime  int64 
    SequenceNo  int 
    DomainID    int 
}

type ipfix struct {
	AgentID  string
        Header  Header 
	DataSets [][]dataField
}

type dIPFIXSample struct {
	device string
	sourceIPv4Address    string
	sourceTransportPort   uint64 
	postNATSourceIPv4Address    string
	postNATSourceTransportPort uint64
	destinationIPv4Address string
	postNATDestinationIPv4Address string
	postNATDestinationTransportPort uint64
	dstport   uint64 
       timestamp  string 
       postNATSourceIPv6Address string
       postNATDestinationIPv6Address string
      sourceIPv6Address string
      destinationIPv6Address string
      proto  uint8
     login string
     sessionid  uint64 
}

var opts options

func init() {
	flag.StringVar(&opts.Broker, "broker", "172.18.0.4:9092", "broker ipaddress:port")
	flag.StringVar(&opts.Topic, "topic", "vflow.ipfix", "kafka topic")
	flag.BoolVar(&opts.Debug, "debug", true, "enabled/disabled debug")
	flag.IntVar(&opts.Workers, "workers", 16, "workers number / partition number")

	flag.Parse()
}


func main() {
	var (
		wg sync.WaitGroup
		ch = make(chan ipfix, 10000)
	)

	for i := 0; i < 5; i++ {
		go ingestClickHouse(ch)
	}

	wg.Add(opts.Workers)

	for i := 0; i < opts.Workers; i++ {
		go func(ti int) {
			// create a new kafka reader with the broker and topic
			r := kafka.NewReader(kafka.ReaderConfig{
				Brokers: []string{opts.Broker},
				Topic:   opts.Topic,
				GroupID: "mygroup",
				// start consuming from the earliest message
				StartOffset: 0,
			})

			pCount := 0
			count := 0
			tik := time.Tick(10 * time.Second)

			for {
				select {
				case <-tik:
					if opts.Debug {
						log.Printf("partition GroupId#%d,  rate=%d\n", ti, (count-pCount)/10)
					}
					pCount = count
				default:
					// read the next message from kafka
					m, err := r.ReadMessage(context.Background())
					if err != nil {
						if err == kafka.ErrGenerationEnded {
							log.Println("generation ended")
							return
						}
						log.Println(err)
						continue
					}
//					log.Printf("Received message from Kafka: %s\n", string(m.Value))

                                        
					// unmarshal the message into an ipfix struct
					 objmap:=  ipfix{}
					if err := json.Unmarshal(m.Value, &objmap); err != nil {
						log.Println(err)
						continue
					}
                                           fmt.Sprintf("kkkkkkkkkkkkkkkk%v",objmap);
					// send the ipfix struct to the ingestClickHouse goroutine
					ch <- objmap
//                                         go ingestClickHouse(ch)

					// mark the message as processed
					if err := r.CommitMessages(context.Background(), m); err != nil {
						log.Println(err)
						continue
					}

					count++
				}
			}
		}(i)
	}

	wg.Wait()
//	close(ch)
}


func ingestClickHouse(ch chan ipfix) {
	var sample ipfix

	connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&username=default&password=wawa123")
	if err != nil {
		log.Fatal(err)
	}
	if err := connect.Ping(); err != nil {
		if exception, ok := err.(*clickhouse.Exception); ok {
			log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
		} else {
			log.Println(err)
		}
		return
	}
	defer connect.Close()
	for {
		tx, err := connect.Begin()
		if err != nil {
			log.Fatal(err)
		}
		stmt, err := tx.Prepare("INSERT INTO natdb.natlogs (timestamp,router_ip,sourceIPv4Address, sourceTransportPort,postNATSourceIPv4Address,postNATSourceTransportPort,destinationIPv4Address,dstport,postNATDestinationIPv4Address, postNATDestinationTransportPort,postNATSourceIPv6Address,postNATDestinationIPv6Address,sourceIPv6Address,destinationIPv6Address,proto,login) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)")
		if err != nil {
			log.Fatal(err)
		}


		for i := 0; i < 10000; i++ {

			sample = <-ch
			for _, data := range sample.DataSets {
				s := dIPFIXSample{}
				for _, dd := range data {
					switch dd.I {
					case 8:
						s.sourceIPv4Address = dd.V.(string)
					case 7:
						s.sourceTransportPort =uint64( dd.V.(float64))
					case 225:
						s.postNATSourceIPv4Address =  dd.V.(string)
					case 227:
						s.postNATSourceTransportPort = uint64(dd.V.(float64))
					case 12:
					      s.destinationIPv4Address=dd.V.(string)
					case 11:
					      s.dstport=uint64(dd.V.(float64))
					case 226:
					      s.postNATDestinationIPv4Address=dd.V.(string)
                    case 27:
                          s.sourceIPv6Address=dd.V.(string)
                    case 28:
                          s.destinationIPv6Address=dd.V.(string)
                    case 281:
                          s.postNATSourceIPv6Address=dd.V.(string)
                    case 282:
                          s.postNATDestinationIPv6Address=dd.V.(string) 
                    case 2003:
                          s.login =dd.V.(string)
                          log.Printf(dd.V.(string))   
     				case 228:
					      s.postNATDestinationTransportPort=uint64(dd.V.(float64))                  	
					case 4:
						s.proto = uint8(dd.V.(float64))
					}
				}
				timestamp := time.Unix(sample.Header.ExportTime, 0).Format("2006-01-02 15:04:05")


 

				if _, err := stmt.Exec(
 
                    timestamp,
					 
					sample.AgentID,
					s.sourceIPv4Address,
					s.sourceTransportPort,
					s.postNATSourceIPv4Address,
					s.postNATSourceTransportPort,
					s.destinationIPv4Address,
					s.dstport,
					s.postNATDestinationIPv4Address,
					s.postNATDestinationTransportPort,
                                        s.postNATSourceIPv6Address,
                                        s.postNATDestinationIPv6Address,
                                        s.sourceIPv6Address,
                                        s.destinationIPv6Address,
					s.proto,
                                        s.login,
				); err != nil {
					log.Fatal(err)
				}

}



	 }
		go func(tx *sql.Tx) {
			if err := tx.Commit(); err != nil {
				log.Fatal(err)
			}
		}(tx)


	}
}

the code is working fine and i am able to insert data in clickhouse but because of high traffics and huge amount of data which inserted in kafka there is a delay between kafka and clickhouse which increase as the traffic increased , right now i have more than 20 hours delay , can you please recommend me any way to make it faster this is my clickhouse table

CREATE TABLE natdb.natlogs
(
    `timestamp` DateTime,
    `router_ip` String,
    `sourceIPv4Address` String,
    `sourceTransportPort` UInt64,
    `postNATSourceIPv4Address` String,
    `postNATSourceTransportPort` UInt64,
    `destinationIPv4Address` String,
    `dstport` UInt64,
    `postNATDestinationIPv4Address` String,
    `postNATDestinationTransportPort` UInt64,
    `proto` UInt8,
    `login` String,
    `sessionid` String,
    `sourceIPv6Address` String,
    `destinationIPv6Address` String,
    `postNATSourceIPv6Address` String,
    `postNATDestinationIPv6Address` String,
    INDEX idx_natlogs_router_source_time_postnat (router_ip, sourceIPv4Address, timestamp, postNATSourceIPv4Address) TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY router_ip
SETTINGS index_granularity = 8192 

I want to have faster way to insert data in clickhouse
thanks in advance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant