Merge branch '15003-disable-config-warnings'
[arvados.git] / services / keepstore / keepstore.go
index 03eef7e76b0b897ed2cb70b95f22989b76436123..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -8,16 +8,17 @@ import (
        "flag"
        "fmt"
        "net"
-       "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -50,6 +51,7 @@ var (
        DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
        ExpiredError        = &KeepError{401, "Expired permission signature"}
        NotFoundError       = &KeepError{404, "Not Found"}
+       VolumeBusyError     = &KeepError{503, "Volume backend busy"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
@@ -120,7 +122,9 @@ func main() {
 
        log.Printf("keepstore %s started", version)
 
-       err = theConfig.Start()
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -150,6 +154,22 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -157,7 +177,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter()
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -165,19 +185,23 @@ func main() {
                log.Fatal(err)
        }
 
-       // Initialize Pull queue and worker
+       // Initialize keepclient for pull workers
        keepClient := &keepclient.KeepClient{
                Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
        }
 
-       // Initialize the pullq and worker
+       // Initialize the pullq and workers
        pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
+       for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+               go RunPullWorker(pullq, keepClient)
+       }
 
-       // Initialize the trashq and worker
+       // Initialize the trashq and workers
        trashq = NewWorkQueue()
-       go RunTrashWorker(trashq)
+       for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+               go RunTrashWorker(trashq)
+       }
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
@@ -199,7 +223,8 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{Handler: router}
+       srv := &server{}
+       srv.Handler = router
        srv.Serve(listener)
 }