Merge branch '14482-rubysdk-empty-dir'
[arvados.git] / services / keepstore / keepstore.go
index b8a0ffb1cba46777ff1e2d1c745eb8102ea5fa61..6ae414bf931ce9164f7beefcc0d9be294da6e9c5 100644 (file)
@@ -8,17 +8,15 @@ import (
        "flag"
        "fmt"
        "net"
-       "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
 )
 
@@ -152,6 +150,22 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -159,10 +173,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter()
-       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
-       router.limiter = limiter
-       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
+       router := MakeRESTRouter(cluster)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -170,19 +181,23 @@ func main() {
                log.Fatal(err)
        }
 
-       // Initialize Pull queue and worker
+       // Initialize keepclient for pull workers
        keepClient := &keepclient.KeepClient{
                Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
        }
 
-       // Initialize the pullq and worker
+       // Initialize the pullq and workers
        pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
+       for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+               go RunPullWorker(pullq, keepClient)
+       }
 
-       // Initialize the trashq and worker
+       // Initialize the trashq and workers
        trashq = NewWorkQueue()
-       go RunTrashWorker(trashq)
+       for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+               go RunTrashWorker(trashq)
+       }
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
@@ -204,7 +219,8 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{}
+       srv := &server{}
+       srv.Handler = router
        srv.Serve(listener)
 }