Merge branch '14259-pysdk-remote-block-copy'
[arvados.git] / services / keepstore / keepstore.go
index 23af6906bbcf89ec0832e8416a3198096edbef1c..6ae414bf931ce9164f7beefcc0d9be294da6e9c5 100644 (file)
@@ -8,21 +8,20 @@ import (
        "flag"
        "fmt"
        "net"
-       "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
-       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
 
@@ -104,8 +103,8 @@ func main() {
 
        // Print version information if requested
        if *getVersion {
-               fmt.Printf("Version: %s\n", arvadosVersion.GetVersion())
-               os.Exit(0)
+               fmt.Printf("keepstore %s\n", version)
+               return
        }
 
        deprecated.afterFlagParse(theConfig)
@@ -119,7 +118,7 @@ func main() {
                log.Fatal(config.DumpAndExit(theConfig))
        }
 
-       log.Printf("keepstore %q started", arvadosVersion.GetVersion())
+       log.Printf("keepstore %s started", version)
 
        err = theConfig.Start()
        if err != nil {
@@ -151,17 +150,30 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
-       router := MakeRESTRouter()
-       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
-       router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       // Middleware/handler stack
+       router := MakeRESTRouter(cluster)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -169,19 +181,23 @@ func main() {
                log.Fatal(err)
        }
 
-       // Initialize Pull queue and worker
+       // Initialize keepclient for pull workers
        keepClient := &keepclient.KeepClient{
                Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
        }
 
-       // Initialize the pullq and worker
+       // Initialize the pullq and workers
        pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
+       for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+               go RunPullWorker(pullq, keepClient)
+       }
 
-       // Initialize the trashq and worker
+       // Initialize the trashq and workers
        trashq = NewWorkQueue()
-       go RunTrashWorker(trashq)
+       for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+               go RunTrashWorker(trashq)
+       }
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
@@ -203,7 +219,8 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{}
+       srv := &server{}
+       srv.Handler = router
        srv.Serve(listener)
 }