Skip to content

Commit

Permalink
Limit concurrency, add contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
subtle-byte committed Jul 8, 2023
1 parent 039fc31 commit f67c2df
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 121 deletions.
12 changes: 9 additions & 3 deletions Makefile
Expand Up @@ -7,12 +7,18 @@ stop-db:
# DB is optional, if not provided, the service will be run without cache
run:
DB_CONN="postgres://postgres:password@localhost:54329/?sslmode=disable" \
DEBUG_TOKEN="" \
go run cmd/server/main.go
DEBUG_TOKEN="dt" \
MAX_REPO_SIZE_MB=100 \
MAX_CONCURRENT_WORK=2 \
go run ./cmd/server/main.go

run-in-docker:
docker build -t ghloc .
docker run --rm -p 8080:8080 -e DEBUG_TOKEN="" ghloc
docker run --rm -p 8080:8080 \
-e DEBUG_TOKEN="dt" \
-e MAX_REPO_SIZE_MB=100 \
-e MAX_CONCURRENT_WORK=2 \
ghloc

test:
go build -v ./...
Expand Down
51 changes: 51 additions & 0 deletions cmd/server/db.go
@@ -0,0 +1,51 @@
package main

import (
"database/sql"
"errors"
"fmt"
"log"

"github.com/golang-migrate/migrate/v4"
)

type MigrationLogger struct {
Prefix string
}

func (m MigrationLogger) Printf(format string, v ...interface{}) {
log.Print(m.Prefix, fmt.Sprintf(format, v...))
}

func (m MigrationLogger) Verbose() bool {
return false
}

func connectAndMigrateDB(dbConn string) (_ *sql.DB, close func() error, err error) {
if dbConn == "" {
return nil, nil, fmt.Errorf("env var DB_CONN is not provided")
}

m, err := migrate.New("file://migrations", dbConn)
if err != nil {
return nil, nil, fmt.Errorf("create migrator: %w", err)
}
m.Log = MigrationLogger{Prefix: "migration: "}
err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
return nil, nil, fmt.Errorf("migrate up: %w", err)
}

close = func() error { return nil }
db, err := sql.Open("postgres", dbConn)
if err == nil {
close = db.Close
err = db.Ping()
}

if err != nil {
close()
return nil, nil, fmt.Errorf("connect to db: %w", err)
}
return db, close, nil
}
36 changes: 36 additions & 0 deletions cmd/server/debug_middleware.go
@@ -0,0 +1,36 @@
package main

import (
"fmt"
"net/http"
"net/http/httptest"
"runtime/pprof"
)

func NewDebugMiddleware(debugToken string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
if debugToken == "" {
return http.HandlerFunc(http.NotFound)
}
fn := func(w http.ResponseWriter, r *http.Request) {
if r.FormValue("debug_token") == debugToken {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="profile"`)
if err := pprof.StartCPUProfile(w); err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Del("Content-Disposition")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not enable CPU profiling: %s\n", err)
return
}
rr := httptest.ResponseRecorder{}
next.ServeHTTP(&rr, r)
pprof.StopCPUProfile()
} else {
http.NotFound(w, r)
}
}
return http.HandlerFunc(fn)
}
}
101 changes: 17 additions & 84 deletions cmd/server/main.go
@@ -1,113 +1,46 @@
package main

import (
"database/sql"
"errors"
"fmt"
"log"
"net/http"
"net/http/httptest"
"os"
"runtime/pprof"

// _ "net/http/pprof"

"github.com/caarlos0/env/v9"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/lib/pq"
"github.com/subtle-byte/ghloc/internal/infrastructure/github_files_provider"
"github.com/subtle-byte/ghloc/internal/infrastructure/postgres_loc_cacher"
"github.com/subtle-byte/ghloc/internal/server/github_handler"
"github.com/subtle-byte/ghloc/internal/service/github_stat"
github_stat_service "github.com/subtle-byte/ghloc/internal/service/github_stat"
)

var debugToken *string

func DebugMiddleware(next http.Handler) http.Handler {
if debugToken == nil {
return http.HandlerFunc(http.NotFound)
}
fn := func(w http.ResponseWriter, r *http.Request) {
if r.FormValue("debug_token") == *debugToken {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="profile"`)
if err := pprof.StartCPUProfile(w); err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Del("Content-Disposition")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not enable CPU profiling: %s\n", err)
return
}
rr := httptest.ResponseRecorder{}
next.ServeHTTP(&rr, r)
pprof.StopCPUProfile()
} else {
http.NotFound(w, r)
}
}
return http.HandlerFunc(fn)
}

type MigrationLogger struct {
Prefix string
}

func (m MigrationLogger) Printf(format string, v ...interface{}) {
log.Print(m.Prefix, fmt.Sprintf(format, v...))
}

func (m MigrationLogger) Verbose() bool {
return false
}

func connectDB() (_ *sql.DB, close func() error, err error) {
dbConn := os.Getenv("DB_CONN")
if dbConn == "" {
return nil, nil, fmt.Errorf("env var DB_CONN is not provided")
}

m, err := migrate.New("file://migrations", dbConn)
if err != nil {
return nil, nil, fmt.Errorf("create migrator: %w", err)
}
m.Log = MigrationLogger{Prefix: "migration: "}
err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
return nil, nil, fmt.Errorf("migrate up: %w", err)
}

close = func() error { return nil }
db, err := sql.Open("postgres", dbConn)
if err == nil {
close = db.Close
err = db.Ping()
}

if err != nil {
close()
return nil, nil, fmt.Errorf("connect to db: %w", err)
}
return db, close, nil
type Config struct {
DebugToken string `env:"DEBUG_TOKEN"`
MaxRepoSizeMB int `env:"MAX_REPO_SIZE_MB,notEmpty"`
MaxConcurrentWork int `env:"MAX_CONCURRENT_WORK,notEmpty"`
DbConnStr string `env:"DB_CONN"`
}

var buildTime = "unknown" // will be replaced during building the docker image

func main() {
log.Printf("Starting up the app (build time: %v)\n", buildTime)

if token, ok := os.LookupEnv("DEBUG_TOKEN"); ok {
debugToken = &token
log.Println("Debug token is set")
cfg := &Config{}
if err := env.Parse(cfg); err != nil {
log.Fatalf("Parsing config: %v", err)
}
log.Printf("Debug token is set: %v", cfg.DebugToken != "")

github := github_files_provider.Github{}
db, closeDB, err := connectDB()
pg := github_stat.LOCCacher(nil)
github := github_files_provider.New(cfg.MaxRepoSizeMB)
db, closeDB, err := connectAndMigrateDB(cfg.DbConnStr)
pg := github_stat_service.LOCCacher(nil)
if err == nil {
defer closeDB()
pg = postgres_loc_cacher.NewPostgres(db)
Expand All @@ -116,7 +49,7 @@ func main() {
log.Printf("Error connecting to DB: %v", err)
log.Println("Warning: continue without DB")
}
service := github_stat.Service{pg, &github}
service := github_stat_service.New(pg, github, cfg.MaxConcurrentWork)

router := chi.NewRouter()
router.Use(middleware.RealIP)
Expand All @@ -131,15 +64,15 @@ func main() {
fmt.Fprintf(w, "<html><body><a href='https://github.com/subtle-byte/ghloc'>Docs</a></body><html>")
})

getStatHandler := &github_handler.GetStatHandler{&service, debugToken}
getStatHandler := &github_handler.GetStatHandler{service, cfg.DebugToken}
getStatHandler.RegisterOn(router)

redirectHandler := &github_handler.RedirectHandler{}
redirectHandler.RegisterOn(router)

// router.Mount("/debug", http.DefaultServeMux)
// router.With(DebugMiddleware).Mount("/debug", http.DefaultServeMux)
router.With(DebugMiddleware).Route("/debug", func(r chi.Router) {
router.With(NewDebugMiddleware(cfg.DebugToken)).Route("/debug", func(r chi.Router) {
getStatHandler.RegisterOn(r)
})
fmt.Println("Listening on http://localhost:8080")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -3,6 +3,7 @@ module github.com/subtle-byte/ghloc
go 1.19

require (
github.com/caarlos0/env/v9 v9.0.0
github.com/go-chi/chi/v5 v5.0.8
github.com/go-chi/cors v1.2.1
github.com/golang-migrate/migrate/v4 v4.16.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
@@ -1,5 +1,7 @@
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/caarlos0/env/v9 v9.0.0 h1:SI6JNsOA+y5gj9njpgybykATIylrRMklbs5ch6wO6pc=
github.com/caarlos0/env/v9 v9.0.0/go.mod h1:ye5mlCVMYh6tZ+vCgrs/B95sj88cg5Tlnc0XIzgZ020=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dhui/dktest v0.3.16 h1:i6gq2YQEtcrjKbeJpBkWjE8MmLZPYllcjOFbTZuPDnw=
Expand Down
31 changes: 20 additions & 11 deletions internal/infrastructure/github_files_provider/github.go
Expand Up @@ -3,6 +3,7 @@ package github_files_provider
import (
"archive/zip"
"bytes"
"context"
"fmt"
"io"
"log"
Expand All @@ -16,18 +17,23 @@ import (
)

type Github struct {
maxZipSizeBytes int
}

const maxZipSize = 100 * 1024 * 1024 // 100 MiB
func New(maxZipSizeMB int) *Github {
return &Github{
maxZipSizeBytes: maxZipSizeMB * 1024 * 1024,
}
}

func BuildGithubUrl(user, repo, branch string) string {
func buildGithubUrl(user, repo, branch string) string {
return fmt.Sprintf("https://github.com/%v/%v/archive/refs/heads/%v.zip", user, repo, branch)
}

func ReadIntoMemory(r io.Reader) (*bytes.Reader, error) {
func (g *Github) readIntoMemory(r io.Reader) (*bytes.Reader, error) {
buf := &bytes.Buffer{}

lr := &LimitedReader{Reader: r, Remaining: maxZipSize}
lr := &LimitedReader{Reader: r, Remaining: g.maxZipSizeBytes}
_, err := io.Copy(buf, lr)
if err != nil {
return nil, err
Expand All @@ -36,15 +42,18 @@ func ReadIntoMemory(r io.Reader) (*bytes.Reader, error) {
return bytes.NewReader(buf.Bytes()), nil
}

func (r Github) GetContent(user, repo, branch string, tempStorage github_stat.TempStorage) (_ []github_stat.FileForPath, close func() error, _ error) {
url := BuildGithubUrl(user, repo, branch)
func (g *Github) GetContent(ctx context.Context, user, repo, branch string, tempStorage github_stat.TempStorage) (_ []github_stat.FileForPath, close func() error, _ error) {
url := buildGithubUrl(user, repo, branch)

start := time.Now()

resp, err := http.Get(url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Println(url, err)
return nil, nil, err
return nil, nil, fmt.Errorf("create request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, nil, fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()

Expand All @@ -60,15 +69,15 @@ func (r Github) GetContent(user, repo, branch string, tempStorage github_stat.Te
readerAt := io.ReaderAt(nil)
readerLen := 0
if tempStorage == github_stat.TempStorageFile {
tempFile, err := NewTempFile(resp.Body)
tempFile, err := NewTempFile(resp.Body, g.maxZipSizeBytes)
if err != nil {
return nil, nil, err
}
closer = tempFile.Close
readerAt = tempFile
readerLen = tempFile.Len()
} else {
r, err := ReadIntoMemory(resp.Body)
r, err := g.readIntoMemory(resp.Body)
if err != nil {
return nil, nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions internal/infrastructure/github_files_provider/temp_file.go
Expand Up @@ -2,7 +2,6 @@ package github_files_provider

import (
"io"
"io/ioutil"
"log"
"os"
)
Expand All @@ -12,23 +11,23 @@ type TempFile struct {
len int
}

func NewTempFile(r io.Reader) (_ *TempFile, err error) {
func NewTempFile(r io.Reader, maxSizeBytes int) (_ *TempFile, err error) {
tf := &TempFile{}

tf.File, err = ioutil.TempFile("", "")
tf.File, err = os.CreateTemp("", "")
if err != nil {
return nil, err
}
log.Print("temp file: ", tf.File.Name())

lr := &LimitedReader{Reader: r, Remaining: maxZipSize}
lr := &LimitedReader{Reader: r, Remaining: maxSizeBytes}
_, err = io.Copy(tf.File, lr)
if err != nil {
tf.Close()
return nil, err
}

tf.len = maxZipSize - lr.Remaining
tf.len = maxSizeBytes - lr.Remaining
return tf, nil
}

Expand Down

0 comments on commit f67c2df

Please sign in to comment.