Skip to content

Commit

Permalink
Site-down logic not being triggered correctly
Browse files Browse the repository at this point in the history
- Also general webserver / proxy logic refactoring

Fixes ruckstack/ruckstack #6
  • Loading branch information
nvoxland committed Mar 2, 2020
1 parent 6bc3d9e commit ef7608b
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 64 deletions.
6 changes: 3 additions & 3 deletions internal/installer/upgrade.go
Expand Up @@ -65,7 +65,7 @@ func Upgrade(upgradeFile string, targetDir string) {
err = serverProcess.Signal(syscall.Signal(0))
if err == nil {
log.Printf("Found running server on PID %d", serverPid)
userMessage("Shutting down server...")
userMessage("Shutting down server...\n")

err := serverProcess.Kill()
util.Check(err)
Expand Down Expand Up @@ -101,9 +101,9 @@ func Upgrade(upgradeFile string, targetDir string) {
userMessage("\n\nUpgrade complete\n")

if serverShutdown {
userMessage("Server was shut down as part of upgrade process and must be restarted")
userMessage("Server was shut down as part of upgrade process and must be restarted\n")
} else {
userMessage("Server was NOT auto-started as part of the upgrade process")
userMessage("Server was NOT auto-started as part of the upgrade process\n")
}
}

Expand Down
Expand Up @@ -4,14 +4,14 @@ metadata:
name: traefik
namespace: kube-system
spec:
chart: traefik
version: 1.77.1
chart: https://%{KUBERNETES_API}%/static/charts/traefik-1.81.0.tgz

set:
serviceType: "ClusterIP"
rbac.enabled: "false"
rbac.enabled: "true"
ssl.enabled: "false"
metrics.prometheus.enabled: "true"
metrics.prometheus.enabled: "false"
dashboard.enabled: "false"
kubernetes.ingressEndpoint.useDefaultPublishedService: "true"
valuesContent: |-
externalTrafficPolicy: ""
4 changes: 3 additions & 1 deletion internal/system-control/k3s/ctr.go
Expand Up @@ -3,15 +3,17 @@ package k3s
import (
"github.com/ruckstack/ruckstack/internal/system-control/util"
"io"
"log"
"os/exec"
)

func ExecCtr(stdout io.Writer, stderr io.Writer, args ...string) {

command := exec.Command(util.InstallDir()+"/lib/k3s", append([]string{"ctr"}, args...)...)
command.Dir = util.InstallDir()
command.Stdout = stdout
command.Stderr = stderr
if err := command.Run(); err != nil {
panic(err)
log.Printf("Cannot import images %s: %s", args, err)
}
}
2 changes: 1 addition & 1 deletion internal/system-control/k3s/k3s.go
Expand Up @@ -27,10 +27,10 @@ func Start() {
"--node-external-ip", util.GetLocalConfig().BindAddress,
"--default-local-storage-path", util.InstallDir()+"/data/local-storage",
"--data-dir", util.InstallDir()+"/data",
"--no-deploy", "traefik",
"--kubelet-arg", "root-dir="+util.InstallDir()+"/data/kubelet",
"--write-kubeconfig", kubecConfigFile,
"--write-kubeconfig-mode", "640")
k3sStartCommand.Env = append(k3sStartCommand.Env, "")
k3sStartCommand.Stdout = k3sLogs
k3sStartCommand.Stderr = k3sLogs
err = k3sStartCommand.Start()
Expand Down
8 changes: 4 additions & 4 deletions internal/system-control/server/monitor/monitor-services.go
Expand Up @@ -18,6 +18,8 @@ func watchServices() {
stopper := make(chan struct{})
defer close(stopper)

foundProblem(TRAEFIK_NOT_LISTENING, "System starting")

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
newService := newObj.(*core.Service)
Expand Down Expand Up @@ -48,12 +50,10 @@ func checkService(service *core.Service) {
fullName := fullName(service.ObjectMeta)

if fullName == "kube-system.traefik" {
ServerStatus.TraefikIp = service.Spec.ClusterIP

if ServerStatus.TraefikIp == "" {
if service.Spec.ClusterIP == "" {
foundProblem(TRAEFIK_NOT_LISTENING, "")
} else {
resolveProblem(TRAEFIK_NOT_LISTENING, fmt.Sprintf("Traefik is listening on %s", ServerStatus.TraefikIp))
resolveProblem(TRAEFIK_NOT_LISTENING, fmt.Sprintf("Traefik is listening on %s", service.Spec.ClusterIP))
}
}
}
1 change: 0 additions & 1 deletion internal/system-control/server/monitor/monitor.go
Expand Up @@ -16,7 +16,6 @@ var (
)

var ServerStatus = struct {
TraefikIp string
SystemReady bool
}{}

Expand Down
4 changes: 4 additions & 0 deletions internal/system-control/server/server.go
Expand Up @@ -21,7 +21,11 @@ func Start() {

fmt.Printf("Starting %s version %s\n", util.GetPackageConfig().Name, util.GetPackageConfig().Version)

err := os.MkdirAll(filepath.Join(util.InstallDir(), "logs"), 0755)
util.Check(err)
serverLog, err := os.OpenFile(filepath.Join(util.InstallDir(), "logs", "server.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
util.Check(err)

defer serverLog.Close()
log.SetOutput(serverLog)

Expand Down
22 changes: 0 additions & 22 deletions internal/system-control/server/webserver/site-down.go

This file was deleted.

115 changes: 87 additions & 28 deletions internal/system-control/server/webserver/webserver.go
Expand Up @@ -7,10 +7,15 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"github.com/ruckstack/ruckstack/internal/system-control/kubeclient"
"github.com/ruckstack/ruckstack/internal/system-control/server/monitor"
"github.com/ruckstack/ruckstack/internal/system-control/util"
"io"
core "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"log"
"math/big"
"net/http"
Expand All @@ -21,6 +26,8 @@ import (
"time"
)

var reverseProxy *httputil.ReverseProxy

func StartWebserver() {
generateKeys()

Expand All @@ -41,39 +48,96 @@ func StartWebserver() {
util.Check(err)
}()

go watchTraefikService()

log.Println("Starting webserver...complete")
}

var traefikIp string

func watchTraefikService() {
kubeClient := kubeclient.KubeClient()

factory := informers.NewSharedInformerFactory(kubeClient, 0)
informer := factory.Core().V1().Services().Informer()
stopper := make(chan struct{})
defer close(stopper)

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
newService := newObj.(*core.Service)

checkTraefikService(newService)
},

AddFunc: func(obj interface{}) {
checkTraefikService(obj.(*core.Service))

},

DeleteFunc: func(obj interface{}) {
delService := obj.(*core.Service)

if delService.ObjectMeta.Namespace == "kube-system" && delService.ObjectMeta.Name == "traefik" {
traefikIp = ""
reverseProxy = nil
log.Printf("Traefik service removed")
}
},
})
informer.Run(stopper)

//traefikService, err := KubeClient().CoreV1().Services("kube-system").Get("traefik", metav1.GetOptions{})
//Check(err)
//
//for _, port := range traefikService.Spec.Ports {
// switch port.Name {
// case "http":
// httpNodePort = port.NodePort
// case "https":
// httpsNodePort = port.NodePort
// }
//}
//
//log.Printf("Http port %d", httpNodePort)
//log.Printf("Https port %d", httpsNodePort)
}

func handleRequest(res http.ResponseWriter, req *http.Request) {
log.Printf("handle request %s %s", req.Method, req.URL.String())
func checkTraefikService(newService *core.Service) {
if newService.ObjectMeta.Namespace == "kube-system" && newService.ObjectMeta.Name == "traefik" {
if traefikIp != newService.Spec.ClusterIP {
traefikIp = newService.Spec.ClusterIP

log.Printf("Traefik IP is now %s. Configuring proxy...", traefikIp)

internalUrl, err := url.Parse(fmt.Sprintf("http://%s", traefikIp))
util.Check(err)

reverseProxy = httputil.NewSingleHostReverseProxy(internalUrl)
reverseProxy.ErrorHandler = func(response http.ResponseWriter, request *http.Request, err error) {
if err.Error() == "Gateway Error" {
showSiteDownPage(response)
}
}

reverseProxy.ModifyResponse = func(response *http.Response) error {
if response.StatusCode == 502 || response.StatusCode == 503 || response.StatusCode == 504 {
return errors.New("Gateway Error")
}
return nil
}

}
}
}

func handleRequest(res http.ResponseWriter, req *http.Request) {
if strings.HasPrefix(req.URL.Path, "/ops/") {
serveOpsPage(res, req)
} else if monitor.ServerStatus.SystemReady {
} else if reverseProxy != nil && monitor.ServerStatus.SystemReady {
proxyToKubernetes(res, req)
} else {
showSiteDownPage(res, req)
showSiteDownPage(res)
}

}

func serveOpsPage(res http.ResponseWriter, req *http.Request) {
siteDownFile, err := os.Open(util.InstallDir() + "/data/web" + req.URL.Path)
if strings.HasPrefix(req.URL.Path, "/ops/http") {

} else {
serveLocalFile(res, req.URL.Path)
}
}

func serveLocalFile(res http.ResponseWriter, url string) {
siteDownFile, err := os.Open(util.InstallDir() + "/data/web" + url)
if err == nil {
defer siteDownFile.Close()

Expand All @@ -84,16 +148,11 @@ func serveOpsPage(res http.ResponseWriter, req *http.Request) {
}
}

func proxyToKubernetes(res http.ResponseWriter, req *http.Request) {
//res.WriteHeader(200)
//res.Write([]byte("Tesitng server"))

internalUrl, err := url.Parse(fmt.Sprintf("http://%s%s", monitor.ServerStatus.TraefikIp, req.URL.String()))
util.Check(err)
log.Println("Proxying to " + internalUrl.String())

reverseProxy := httputil.NewSingleHostReverseProxy(internalUrl)
func showSiteDownPage(res http.ResponseWriter) {
serveLocalFile(res, "/site-down.html")
}

func proxyToKubernetes(res http.ResponseWriter, req *http.Request) {
reverseProxy.ServeHTTP(res, req)
}

Expand Down

0 comments on commit ef7608b

Please sign in to comment.