2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / command.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "io"
12
13         "git.arvados.org/arvados.git/lib/cmd"
14         "git.arvados.org/arvados.git/lib/config"
15         "git.arvados.org/arvados.git/lib/service"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/ctxlog"
18         "github.com/prometheus/client_golang/prometheus"
19         "github.com/sirupsen/logrus"
20 )
21
22 var (
23         Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler)
24 )
25
26 func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
27         args, ok, code := convertKeepstoreFlagsToServiceFlags(prog, args, ctxlog.FromContext(context.Background()), stderr)
28         if !ok {
29                 return code
30         }
31         return Command.RunCommand(prog, args, stdin, stdout, stderr)
32 }
33
34 // Parse keepstore command line flags, and return equivalent
35 // service.Command flags. If the second return value ("ok") is false,
36 // the program should exit, and the third return value is a suitable
37 // exit code.
38 func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.FieldLogger, stderr io.Writer) ([]string, bool, int) {
39         flags := flag.NewFlagSet("", flag.ContinueOnError)
40         flags.String("listen", "", "Services.Keepstore.InternalURLs")
41         flags.Int("max-buffers", 0, "API.MaxKeepBlobBuffers")
42         flags.Int("max-requests", 0, "API.MaxConcurrentRequests")
43         flags.Bool("never-delete", false, "Collections.BlobTrash")
44         flags.Bool("enforce-permissions", false, "Collections.BlobSigning")
45         flags.String("permission-key-file", "", "Collections.BlobSigningKey")
46         flags.String("blob-signing-key-file", "", "Collections.BlobSigningKey")
47         flags.String("data-manager-token-file", "", "SystemRootToken")
48         flags.Int("permission-ttl", 0, "Collections.BlobSigningTTL")
49         flags.Int("blob-signature-ttl", 0, "Collections.BlobSigningTTL")
50         flags.String("trash-lifetime", "", "Collections.BlobTrashLifetime")
51         flags.Bool("serialize", false, "Volumes.*.DriverParameters.Serialize")
52         flags.Bool("readonly", false, "Volumes.*.ReadOnly")
53         flags.String("pid", "", "-")
54         flags.String("trash-check-interval", "", "Collections.BlobTrashCheckInterval")
55
56         flags.String("azure-storage-container-volume", "", "Volumes.*.Driver")
57         flags.String("azure-storage-account-name", "", "Volumes.*.DriverParameters.StorageAccountName")
58         flags.String("azure-storage-account-key-file", "", "Volumes.*.DriverParameters.StorageAccountKey")
59         flags.String("azure-storage-replication", "", "Volumes.*.Replication")
60         flags.String("azure-max-get-bytes", "", "Volumes.*.DriverParameters.MaxDataReadSize")
61
62         flags.String("s3-bucket-volume", "", "Volumes.*.DriverParameters.Bucket")
63         flags.String("s3-region", "", "Volumes.*.DriverParameters.Region")
64         flags.String("s3-endpoint", "", "Volumes.*.DriverParameters.Endpoint")
65         flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKeyID")
66         flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretAccessKey")
67         flags.String("s3-race-window", "", "Volumes.*.DriverParameters.RaceWindow")
68         flags.String("s3-replication", "", "Volumes.*.Replication")
69         flags.String("s3-unsafe-delete", "", "Volumes.*.DriverParameters.UnsafeDelete")
70
71         flags.String("volume", "", "Volumes")
72
73         flags.Bool("version", false, "")
74         flags.String("config", "", "")
75         flags.String("legacy-keepstore-config", "", "")
76
77         if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
78                 return nil, false, code
79         }
80
81         args = nil
82         ok := true
83         flags.Visit(func(f *flag.Flag) {
84                 if f.Name == "config" || f.Name == "legacy-keepstore-config" || f.Name == "version" {
85                         args = append(args, "-"+f.Name, f.Value.String())
86                 } else if f.Usage == "-" {
87                         ok = false
88                         lgr.Errorf("command line flag -%s is no longer supported", f.Name)
89                 } else {
90                         ok = false
91                         lgr.Errorf("command line flag -%s is no longer supported -- use Clusters.*.%s in cluster config file instead", f.Name, f.Usage)
92                 }
93         })
94         if !ok {
95                 return nil, false, 2
96         }
97
98         flags = flag.NewFlagSet("", flag.ContinueOnError)
99         loader := config.NewLoader(nil, lgr)
100         loader.SetupFlags(flags)
101         return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0
102 }
103
104 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
105         serviceURL, ok := service.URLFromContext(ctx)
106         if !ok {
107                 return service.ErrorHandler(ctx, cluster, errors.New("BUG: no URL from service.URLFromContext"))
108         }
109         ks, err := newKeepstore(ctx, cluster, token, reg, serviceURL)
110         if err != nil {
111                 return service.ErrorHandler(ctx, cluster, err)
112         }
113         puller := newPuller(ctx, ks, reg)
114         trasher := newTrasher(ctx, ks, reg)
115         _ = newTrashEmptier(ctx, ks, reg)
116         return newRouter(ks, puller, trasher)
117 }