//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"context"
"os"
"sync"
- "git.curoverse.com/arvados.git/lib/config"
- "git.curoverse.com/arvados.git/lib/service"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
var (
- version = "dev"
Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler)
)
-func main() {
- os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
-}
-
func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- args, ok := convertKeepstoreFlagsToServiceFlags(args, ctxlog.FromContext(context.Background()))
+ args, ok, code := convertKeepstoreFlagsToServiceFlags(prog, args, ctxlog.FromContext(context.Background()), stderr)
if !ok {
- return 2
+ return code
}
return Command.RunCommand(prog, args, stdin, stdout, stderr)
}
// Parse keepstore command line flags, and return equivalent
-// service.Command flags. The second return value ("ok") is true if
-// all provided flags were successfully converted.
-func convertKeepstoreFlagsToServiceFlags(args []string, lgr logrus.FieldLogger) ([]string, bool) {
+// service.Command flags. If the second return value ("ok") is false,
+// the program should exit, and the third return value is a suitable
+// exit code.
+func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.FieldLogger, stderr io.Writer) ([]string, bool, int) {
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.String("listen", "", "Services.Keepstore.InternalURLs")
flags.Int("max-buffers", 0, "API.MaxKeepBlobBuffers")
flags.String("s3-bucket-volume", "", "Volumes.*.DriverParameters.Bucket")
flags.String("s3-region", "", "Volumes.*.DriverParameters.Region")
flags.String("s3-endpoint", "", "Volumes.*.DriverParameters.Endpoint")
- flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKey")
- flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretKey")
+ flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKeyID")
+ flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretAccessKey")
flags.String("s3-race-window", "", "Volumes.*.DriverParameters.RaceWindow")
flags.String("s3-replication", "", "Volumes.*.Replication")
flags.String("s3-unsafe-delete", "", "Volumes.*.DriverParameters.UnsafeDelete")
flags.String("config", "", "")
flags.String("legacy-keepstore-config", "", "")
- err := flags.Parse(args)
- if err == flag.ErrHelp {
- return []string{"-help"}, true
- } else if err != nil {
- return nil, false
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return nil, false, code
}
args = nil
}
})
if !ok {
- return nil, false
+ return nil, false, 2
}
- flags = flag.NewFlagSet("", flag.ExitOnError)
+ flags = flag.NewFlagSet("", flag.ContinueOnError)
loader := config.NewLoader(nil, lgr)
loader.SetupFlags(flags)
- return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true
+ return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0
}
type handler struct {
return h.err
}
+func (h *handler) Done() <-chan struct{} {
+ return nil
+}
+
func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
var h handler
serviceURL, ok := service.URLFromContext(ctx)
}
bufs = newBufferPool(h.Logger, h.Cluster.API.MaxKeepBlobBuffers, BlockSize)
- if h.Cluster.API.MaxConcurrentRequests < 1 {
- h.Cluster.API.MaxConcurrentRequests = h.Cluster.API.MaxKeepBlobBuffers * 2
- h.Logger.Warnf("API.MaxConcurrentRequests <1 or not specified; defaulting to MaxKeepBlobBuffers * 2 == %d", h.Cluster.API.MaxConcurrentRequests)
+ if h.Cluster.API.MaxConcurrentRequests > 0 && h.Cluster.API.MaxConcurrentRequests < h.Cluster.API.MaxKeepBlobBuffers {
+ h.Logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", h.Cluster.API.MaxKeepBlobBuffers, h.Cluster.API.MaxConcurrentRequests)
}
if h.Cluster.Collections.BlobSigningKey != "" {
return errors.New("no volumes configured")
}
- h.Logger.Printf("keepstore %s starting, pid %d", version, os.Getpid())
+ h.Logger.Printf("keepstore %s starting, pid %d", cmd.Version.String(), os.Getpid())
// Start a round-robin VolumeManager with the configured volumes.
vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg))
// Initialize the trashq and workers
h.trashq = NewWorkQueue()
for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
- go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+ go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
}
// Set up routes and metrics