X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ba1cbe5824ddc769d8b1fdb1f51a4e02187e778c..fb6909efa81aa96af7e4bf5b29438de1e21b3068:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 03eef7e76b..fcbdddacb1 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -8,16 +8,17 @@ import ( "flag" "fmt" "net" - "net/http" "os" "os/signal" "syscall" "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/config" "git.curoverse.com/arvados.git/sdk/go/keepclient" "github.com/coreos/go-systemd/daemon" + "github.com/prometheus/client_golang/prometheus" ) var version = "dev" @@ -50,6 +51,7 @@ var ( DiskHashError = &KeepError{500, "Hash mismatch in stored data"} ExpiredError = &KeepError{401, "Expired permission signature"} NotFoundError = &KeepError{404, "Not Found"} + VolumeBusyError = &KeepError{503, "Volume backend busy"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} SizeRequiredError = &KeepError{411, "Missing Content-Length"} @@ -120,7 +122,9 @@ func main() { log.Printf("keepstore %s started", version) - err = theConfig.Start() + metricsRegistry := prometheus.NewRegistry() + + err = theConfig.Start(metricsRegistry) if err != nil { log.Fatal(err) } @@ -150,6 +154,22 @@ func main() { } } + var cluster *arvados.Cluster + cfg, err := arvados.GetConfig(arvados.DefaultConfigFile) + if err != nil && os.IsNotExist(err) { + log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err) + cluster = &arvados.Cluster{ + ClusterID: "xxxxx", + } + } else if err != nil { + log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err) + } else { + cluster, err = cfg.GetCluster("") + if err != nil { + log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err) + } + } + log.Println("keepstore starting, pid", os.Getpid()) defer log.Println("keepstore exiting, pid", os.Getpid()) @@ -157,7 +177,7 @@ func main() { KeepVM = MakeRRVolumeManager(theConfig.Volumes) // Middleware/handler stack - router := MakeRESTRouter() + router := MakeRESTRouter(cluster, metricsRegistry) // Set up a TCP listener. listener, err := net.Listen("tcp", theConfig.Listen) @@ -165,19 +185,23 @@ func main() { log.Fatal(err) } - // Initialize Pull queue and worker + // Initialize keepclient for pull workers keepClient := &keepclient.KeepClient{ Arvados: &arvadosclient.ArvadosClient{}, Want_replicas: 1, } - // Initialize the pullq and worker + // Initialize the pullq and workers pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) + for i := 0; i < 1 || i < theConfig.PullWorkers; i++ { + go RunPullWorker(pullq, keepClient) + } - // Initialize the trashq and worker + // Initialize the trashq and workers trashq = NewWorkQueue() - go RunTrashWorker(trashq) + for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ { + go RunTrashWorker(trashq) + } // Start emptyTrash goroutine doneEmptyingTrash := make(chan bool) @@ -199,7 +223,8 @@ func main() { log.Printf("Error notifying init daemon: %v", err) } log.Println("listening at", listener.Addr()) - srv := &http.Server{Handler: router} + srv := &server{} + srv.Handler = router srv.Serve(listener) }