Merge branch 'master' into 13937-keepstore-prometheus
[arvados.git] / services / keepstore / keepstore.go
index 921176dbbe93f481f497af476f176c2116ef3bce..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -8,20 +8,21 @@ import (
        "flag"
        "fmt"
        "net"
-       "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
+var version = "dev"
+
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
 
@@ -50,6 +51,7 @@ var (
        DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
        ExpiredError        = &KeepError{401, "Expired permission signature"}
        NotFoundError       = &KeepError{404, "Not Found"}
+       VolumeBusyError     = &KeepError{503, "Volume backend busy"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
@@ -89,6 +91,7 @@ func main() {
        deprecated.beforeFlagParse(theConfig)
 
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
 
        defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
        var configPath string
@@ -100,6 +103,12 @@ func main() {
        flag.Usage = usage
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keepstore %s\n", version)
+               return
+       }
+
        deprecated.afterFlagParse(theConfig)
 
        err := config.LoadFile(theConfig, configPath)
@@ -111,7 +120,11 @@ func main() {
                log.Fatal(config.DumpAndExit(theConfig))
        }
 
-       err = theConfig.Start()
+       log.Printf("keepstore %s started", version)
+
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -141,17 +154,30 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
-       router := MakeRESTRouter()
-       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
-       router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       // Middleware/handler stack
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -159,19 +185,23 @@ func main() {
                log.Fatal(err)
        }
 
-       // Initialize Pull queue and worker
+       // Initialize keepclient for pull workers
        keepClient := &keepclient.KeepClient{
                Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
        }
 
-       // Initialize the pullq and worker
+       // Initialize the pullq and workers
        pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
+       for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+               go RunPullWorker(pullq, keepClient)
+       }
 
-       // Initialize the trashq and worker
+       // Initialize the trashq and workers
        trashq = NewWorkQueue()
-       go RunTrashWorker(trashq)
+       for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+               go RunTrashWorker(trashq)
+       }
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
@@ -193,7 +223,8 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{}
+       srv := &server{}
+       srv.Handler = router
        srv.Serve(listener)
 }