2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / command.go
index 48c8256a3ca1e22e524b60fe89ebc7de26be54e7..d01b30c907fcee9da5215bc442e0d8462954ef3e 100644 (file)
@@ -8,20 +8,13 @@ import (
        "context"
        "errors"
        "flag"
-       "fmt"
        "io"
-       "math/rand"
-       "net/http"
-       "os"
-       "sync"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -108,112 +101,17 @@ func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.
        return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0
 }
 
-type handler struct {
-       http.Handler
-       Cluster *arvados.Cluster
-       Logger  logrus.FieldLogger
-
-       pullq      *WorkQueue
-       trashq     *WorkQueue
-       volmgr     *RRVolumeManager
-       keepClient *keepclient.KeepClient
-
-       err       error
-       setupOnce sync.Once
-}
-
-func (h *handler) CheckHealth() error {
-       return h.err
-}
-
-func (h *handler) Done() <-chan struct{} {
-       return nil
-}
-
 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
-       var h handler
        serviceURL, ok := service.URLFromContext(ctx)
        if !ok {
                return service.ErrorHandler(ctx, cluster, errors.New("BUG: no URL from service.URLFromContext"))
        }
-       err := h.setup(ctx, cluster, token, reg, serviceURL)
+       ks, err := newKeepstore(ctx, cluster, token, reg, serviceURL)
        if err != nil {
                return service.ErrorHandler(ctx, cluster, err)
        }
-       return &h
-}
-
-func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) error {
-       h.Cluster = cluster
-       h.Logger = ctxlog.FromContext(ctx)
-       if h.Cluster.API.MaxKeepBlobBuffers <= 0 {
-               return fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
-       }
-       bufs = newBufferPool(h.Logger, h.Cluster.API.MaxKeepBlobBuffers, BlockSize)
-
-       if h.Cluster.API.MaxConcurrentRequests > 0 && h.Cluster.API.MaxConcurrentRequests < h.Cluster.API.MaxKeepBlobBuffers {
-               h.Logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", h.Cluster.API.MaxKeepBlobBuffers, h.Cluster.API.MaxConcurrentRequests)
-       }
-
-       if h.Cluster.Collections.BlobSigningKey != "" {
-       } else if h.Cluster.Collections.BlobSigning {
-               return errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
-       } else {
-               h.Logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
-       }
-
-       if len(h.Cluster.Volumes) == 0 {
-               return errors.New("no volumes configured")
-       }
-
-       h.Logger.Printf("keepstore %s starting, pid %d", cmd.Version.String(), os.Getpid())
-
-       // Start a round-robin VolumeManager with the configured volumes.
-       vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg))
-       if err != nil {
-               return err
-       }
-       if len(vm.readables) == 0 {
-               return fmt.Errorf("no volumes configured for %s", serviceURL)
-       }
-       h.volmgr = vm
-
-       // Initialize the pullq and workers
-       h.pullq = NewWorkQueue()
-       for i := 0; i < 1 || i < h.Cluster.Collections.BlobReplicateConcurrency; i++ {
-               go h.runPullWorker(h.pullq)
-       }
-
-       // Initialize the trashq and workers
-       h.trashq = NewWorkQueue()
-       for i := 0; i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
-               go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
-       }
-
-       // Set up routes and metrics
-       h.Handler = MakeRESTRouter(ctx, cluster, reg, vm, h.pullq, h.trashq)
-
-       // Initialize keepclient for pull workers
-       c, err := arvados.NewClientFromConfig(cluster)
-       if err != nil {
-               return err
-       }
-       ac, err := arvadosclient.New(c)
-       if err != nil {
-               return err
-       }
-       h.keepClient = &keepclient.KeepClient{
-               Arvados:       ac,
-               Want_replicas: 1,
-               DiskCacheSize: keepclient.DiskCacheDisabled,
-       }
-       h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())
-
-       if d := h.Cluster.Collections.BlobTrashCheckInterval.Duration(); d > 0 &&
-               h.Cluster.Collections.BlobTrash &&
-               h.Cluster.Collections.BlobDeleteConcurrency > 0 {
-               go emptyTrash(h.volmgr.mounts, d)
-       }
-
-       return nil
+       puller := newPuller(ctx, ks, reg)
+       trasher := newTrasher(ctx, ks, reg)
+       _ = newTrashEmptier(ctx, ks, reg)
+       return newRouter(ks, puller, trasher)
 }