Merge branch 'master' into 14885-ciso-and-conda-packaging-pr
[arvados.git] / services / keepstore / keepstore.go
index b8a0ffb1cba46777ff1e2d1c745eb8102ea5fa61..a6c8cd99545c24fdc2a56f6c2ff1866682a6ed6d 100644 (file)
@@ -8,17 +8,15 @@ import (
        "flag"
        "fmt"
        "net"
-       "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
 )
 
@@ -52,6 +50,7 @@ var (
        DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
        ExpiredError        = &KeepError{401, "Expired permission signature"}
        NotFoundError       = &KeepError{404, "Not Found"}
+       VolumeBusyError     = &KeepError{503, "Volume backend busy"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
@@ -152,6 +151,22 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -159,10 +174,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter()
-       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
-       router.limiter = limiter
-       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
+       router := MakeRESTRouter(cluster)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -170,19 +182,23 @@ func main() {
                log.Fatal(err)
        }
 
-       // Initialize Pull queue and worker
+       // Initialize keepclient for pull workers
        keepClient := &keepclient.KeepClient{
                Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
        }
 
-       // Initialize the pullq and worker
+       // Initialize the pullq and workers
        pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
+       for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+               go RunPullWorker(pullq, keepClient)
+       }
 
-       // Initialize the trashq and worker
+       // Initialize the trashq and workers
        trashq = NewWorkQueue()
-       go RunTrashWorker(trashq)
+       for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+               go RunTrashWorker(trashq)
+       }
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
@@ -204,7 +220,8 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{}
+       srv := &server{}
+       srv.Handler = router
        srv.Serve(listener)
 }