X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/54377a7bacc182ace0bb8b55a812e0a9fee5ced8..HEAD:/services/keepstore/command.go diff --git a/services/keepstore/command.go b/services/keepstore/command.go index 48c8256a3c..9f14c13384 100644 --- a/services/keepstore/command.go +++ b/services/keepstore/command.go @@ -7,213 +7,27 @@ package keepstore import ( "context" "errors" - "flag" - "fmt" - "io" - "math/rand" - "net/http" - "os" - "sync" - "git.arvados.org/arvados.git/lib/cmd" - "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" - "git.arvados.org/arvados.git/sdk/go/ctxlog" - "git.arvados.org/arvados.git/sdk/go/keepclient" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) var ( Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler) ) -func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - args, ok, code := convertKeepstoreFlagsToServiceFlags(prog, args, ctxlog.FromContext(context.Background()), stderr) - if !ok { - return code - } - return Command.RunCommand(prog, args, stdin, stdout, stderr) -} - -// Parse keepstore command line flags, and return equivalent -// service.Command flags. If the second return value ("ok") is false, -// the program should exit, and the third return value is a suitable -// exit code. -func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.FieldLogger, stderr io.Writer) ([]string, bool, int) { - flags := flag.NewFlagSet("", flag.ContinueOnError) - flags.String("listen", "", "Services.Keepstore.InternalURLs") - flags.Int("max-buffers", 0, "API.MaxKeepBlobBuffers") - flags.Int("max-requests", 0, "API.MaxConcurrentRequests") - flags.Bool("never-delete", false, "Collections.BlobTrash") - flags.Bool("enforce-permissions", false, "Collections.BlobSigning") - flags.String("permission-key-file", "", "Collections.BlobSigningKey") - flags.String("blob-signing-key-file", "", "Collections.BlobSigningKey") - flags.String("data-manager-token-file", "", "SystemRootToken") - flags.Int("permission-ttl", 0, "Collections.BlobSigningTTL") - flags.Int("blob-signature-ttl", 0, "Collections.BlobSigningTTL") - flags.String("trash-lifetime", "", "Collections.BlobTrashLifetime") - flags.Bool("serialize", false, "Volumes.*.DriverParameters.Serialize") - flags.Bool("readonly", false, "Volumes.*.ReadOnly") - flags.String("pid", "", "-") - flags.String("trash-check-interval", "", "Collections.BlobTrashCheckInterval") - - flags.String("azure-storage-container-volume", "", "Volumes.*.Driver") - flags.String("azure-storage-account-name", "", "Volumes.*.DriverParameters.StorageAccountName") - flags.String("azure-storage-account-key-file", "", "Volumes.*.DriverParameters.StorageAccountKey") - flags.String("azure-storage-replication", "", "Volumes.*.Replication") - flags.String("azure-max-get-bytes", "", "Volumes.*.DriverParameters.MaxDataReadSize") - - flags.String("s3-bucket-volume", "", "Volumes.*.DriverParameters.Bucket") - flags.String("s3-region", "", "Volumes.*.DriverParameters.Region") - flags.String("s3-endpoint", "", "Volumes.*.DriverParameters.Endpoint") - flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKeyID") - flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretAccessKey") - flags.String("s3-race-window", "", "Volumes.*.DriverParameters.RaceWindow") - flags.String("s3-replication", "", "Volumes.*.Replication") - flags.String("s3-unsafe-delete", "", "Volumes.*.DriverParameters.UnsafeDelete") - - flags.String("volume", "", "Volumes") - - flags.Bool("version", false, "") - flags.String("config", "", "") - flags.String("legacy-keepstore-config", "", "") - - if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok { - return nil, false, code - } - - args = nil - ok := true - flags.Visit(func(f *flag.Flag) { - if f.Name == "config" || f.Name == "legacy-keepstore-config" || f.Name == "version" { - args = append(args, "-"+f.Name, f.Value.String()) - } else if f.Usage == "-" { - ok = false - lgr.Errorf("command line flag -%s is no longer supported", f.Name) - } else { - ok = false - lgr.Errorf("command line flag -%s is no longer supported -- use Clusters.*.%s in cluster config file instead", f.Name, f.Usage) - } - }) - if !ok { - return nil, false, 2 - } - - flags = flag.NewFlagSet("", flag.ContinueOnError) - loader := config.NewLoader(nil, lgr) - loader.SetupFlags(flags) - return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0 -} - -type handler struct { - http.Handler - Cluster *arvados.Cluster - Logger logrus.FieldLogger - - pullq *WorkQueue - trashq *WorkQueue - volmgr *RRVolumeManager - keepClient *keepclient.KeepClient - - err error - setupOnce sync.Once -} - -func (h *handler) CheckHealth() error { - return h.err -} - -func (h *handler) Done() <-chan struct{} { - return nil -} - func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler { - var h handler serviceURL, ok := service.URLFromContext(ctx) if !ok { return service.ErrorHandler(ctx, cluster, errors.New("BUG: no URL from service.URLFromContext")) } - err := h.setup(ctx, cluster, token, reg, serviceURL) + ks, err := newKeepstore(ctx, cluster, token, reg, serviceURL) if err != nil { return service.ErrorHandler(ctx, cluster, err) } - return &h -} - -func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) error { - h.Cluster = cluster - h.Logger = ctxlog.FromContext(ctx) - if h.Cluster.API.MaxKeepBlobBuffers <= 0 { - return fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero") - } - bufs = newBufferPool(h.Logger, h.Cluster.API.MaxKeepBlobBuffers, BlockSize) - - if h.Cluster.API.MaxConcurrentRequests > 0 && h.Cluster.API.MaxConcurrentRequests < h.Cluster.API.MaxKeepBlobBuffers { - h.Logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", h.Cluster.API.MaxKeepBlobBuffers, h.Cluster.API.MaxConcurrentRequests) - } - - if h.Cluster.Collections.BlobSigningKey != "" { - } else if h.Cluster.Collections.BlobSigning { - return errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey") - } else { - h.Logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.") - } - - if len(h.Cluster.Volumes) == 0 { - return errors.New("no volumes configured") - } - - h.Logger.Printf("keepstore %s starting, pid %d", cmd.Version.String(), os.Getpid()) - - // Start a round-robin VolumeManager with the configured volumes. - vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg)) - if err != nil { - return err - } - if len(vm.readables) == 0 { - return fmt.Errorf("no volumes configured for %s", serviceURL) - } - h.volmgr = vm - - // Initialize the pullq and workers - h.pullq = NewWorkQueue() - for i := 0; i < 1 || i < h.Cluster.Collections.BlobReplicateConcurrency; i++ { - go h.runPullWorker(h.pullq) - } - - // Initialize the trashq and workers - h.trashq = NewWorkQueue() - for i := 0; i < h.Cluster.Collections.BlobTrashConcurrency; i++ { - go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq) - } - - // Set up routes and metrics - h.Handler = MakeRESTRouter(ctx, cluster, reg, vm, h.pullq, h.trashq) - - // Initialize keepclient for pull workers - c, err := arvados.NewClientFromConfig(cluster) - if err != nil { - return err - } - ac, err := arvadosclient.New(c) - if err != nil { - return err - } - h.keepClient = &keepclient.KeepClient{ - Arvados: ac, - Want_replicas: 1, - DiskCacheSize: keepclient.DiskCacheDisabled, - } - h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63()) - - if d := h.Cluster.Collections.BlobTrashCheckInterval.Duration(); d > 0 && - h.Cluster.Collections.BlobTrash && - h.Cluster.Collections.BlobDeleteConcurrency > 0 { - go emptyTrash(h.volmgr.mounts, d) - } - - return nil + puller := newPuller(ctx, ks, reg) + trasher := newTrasher(ctx, ks, reg) + _ = newTrashEmptier(ctx, ks, reg) + return newRouter(ks, puller, trasher) }