17840: Deduplicate flag-parsing code.
[arvados.git] / services / keepstore / command.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "math/rand"
14         "net/http"
15         "os"
16         "sync"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/lib/config"
20         "git.arvados.org/arvados.git/lib/service"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "git.arvados.org/arvados.git/sdk/go/keepclient"
25         "github.com/prometheus/client_golang/prometheus"
26         "github.com/sirupsen/logrus"
27 )
28
29 var (
30         version = "dev"
31         Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler)
32 )
33
34 func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         args, ok, code := convertKeepstoreFlagsToServiceFlags(prog, args, ctxlog.FromContext(context.Background()), stderr)
36         if !ok {
37                 return code
38         }
39         return Command.RunCommand(prog, args, stdin, stdout, stderr)
40 }
41
42 // Parse keepstore command line flags, and return equivalent
43 // service.Command flags. If the second return value ("ok") is false,
44 // the program should exit, and the third return value is a suitable
45 // exit code.
46 func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.FieldLogger, stderr io.Writer) ([]string, bool, int) {
47         flags := flag.NewFlagSet("", flag.ContinueOnError)
48         flags.String("listen", "", "Services.Keepstore.InternalURLs")
49         flags.Int("max-buffers", 0, "API.MaxKeepBlobBuffers")
50         flags.Int("max-requests", 0, "API.MaxConcurrentRequests")
51         flags.Bool("never-delete", false, "Collections.BlobTrash")
52         flags.Bool("enforce-permissions", false, "Collections.BlobSigning")
53         flags.String("permission-key-file", "", "Collections.BlobSigningKey")
54         flags.String("blob-signing-key-file", "", "Collections.BlobSigningKey")
55         flags.String("data-manager-token-file", "", "SystemRootToken")
56         flags.Int("permission-ttl", 0, "Collections.BlobSigningTTL")
57         flags.Int("blob-signature-ttl", 0, "Collections.BlobSigningTTL")
58         flags.String("trash-lifetime", "", "Collections.BlobTrashLifetime")
59         flags.Bool("serialize", false, "Volumes.*.DriverParameters.Serialize")
60         flags.Bool("readonly", false, "Volumes.*.ReadOnly")
61         flags.String("pid", "", "-")
62         flags.String("trash-check-interval", "", "Collections.BlobTrashCheckInterval")
63
64         flags.String("azure-storage-container-volume", "", "Volumes.*.Driver")
65         flags.String("azure-storage-account-name", "", "Volumes.*.DriverParameters.StorageAccountName")
66         flags.String("azure-storage-account-key-file", "", "Volumes.*.DriverParameters.StorageAccountKey")
67         flags.String("azure-storage-replication", "", "Volumes.*.Replication")
68         flags.String("azure-max-get-bytes", "", "Volumes.*.DriverParameters.MaxDataReadSize")
69
70         flags.String("s3-bucket-volume", "", "Volumes.*.DriverParameters.Bucket")
71         flags.String("s3-region", "", "Volumes.*.DriverParameters.Region")
72         flags.String("s3-endpoint", "", "Volumes.*.DriverParameters.Endpoint")
73         flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKeyID")
74         flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretAccessKey")
75         flags.String("s3-race-window", "", "Volumes.*.DriverParameters.RaceWindow")
76         flags.String("s3-replication", "", "Volumes.*.Replication")
77         flags.String("s3-unsafe-delete", "", "Volumes.*.DriverParameters.UnsafeDelete")
78
79         flags.String("volume", "", "Volumes")
80
81         flags.Bool("version", false, "")
82         flags.String("config", "", "")
83         flags.String("legacy-keepstore-config", "", "")
84
85         if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
86                 return nil, false, code
87         }
88
89         args = nil
90         ok := true
91         flags.Visit(func(f *flag.Flag) {
92                 if f.Name == "config" || f.Name == "legacy-keepstore-config" || f.Name == "version" {
93                         args = append(args, "-"+f.Name, f.Value.String())
94                 } else if f.Usage == "-" {
95                         ok = false
96                         lgr.Errorf("command line flag -%s is no longer supported", f.Name)
97                 } else {
98                         ok = false
99                         lgr.Errorf("command line flag -%s is no longer supported -- use Clusters.*.%s in cluster config file instead", f.Name, f.Usage)
100                 }
101         })
102         if !ok {
103                 return nil, false, 2
104         }
105
106         flags = flag.NewFlagSet("", flag.ContinueOnError)
107         loader := config.NewLoader(nil, lgr)
108         loader.SetupFlags(flags)
109         return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0
110 }
111
112 type handler struct {
113         http.Handler
114         Cluster *arvados.Cluster
115         Logger  logrus.FieldLogger
116
117         pullq      *WorkQueue
118         trashq     *WorkQueue
119         volmgr     *RRVolumeManager
120         keepClient *keepclient.KeepClient
121
122         err       error
123         setupOnce sync.Once
124 }
125
126 func (h *handler) CheckHealth() error {
127         return h.err
128 }
129
130 func (h *handler) Done() <-chan struct{} {
131         return nil
132 }
133
134 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
135         var h handler
136         serviceURL, ok := service.URLFromContext(ctx)
137         if !ok {
138                 return service.ErrorHandler(ctx, cluster, errors.New("BUG: no URL from service.URLFromContext"))
139         }
140         err := h.setup(ctx, cluster, token, reg, serviceURL)
141         if err != nil {
142                 return service.ErrorHandler(ctx, cluster, err)
143         }
144         return &h
145 }
146
147 func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) error {
148         h.Cluster = cluster
149         h.Logger = ctxlog.FromContext(ctx)
150         if h.Cluster.API.MaxKeepBlobBuffers <= 0 {
151                 return fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
152         }
153         bufs = newBufferPool(h.Logger, h.Cluster.API.MaxKeepBlobBuffers, BlockSize)
154
155         if h.Cluster.API.MaxConcurrentRequests > 0 && h.Cluster.API.MaxConcurrentRequests < h.Cluster.API.MaxKeepBlobBuffers {
156                 h.Logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", h.Cluster.API.MaxKeepBlobBuffers, h.Cluster.API.MaxConcurrentRequests)
157         }
158
159         if h.Cluster.Collections.BlobSigningKey != "" {
160         } else if h.Cluster.Collections.BlobSigning {
161                 return errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
162         } else {
163                 h.Logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
164         }
165
166         if len(h.Cluster.Volumes) == 0 {
167                 return errors.New("no volumes configured")
168         }
169
170         h.Logger.Printf("keepstore %s starting, pid %d", version, os.Getpid())
171
172         // Start a round-robin VolumeManager with the configured volumes.
173         vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg))
174         if err != nil {
175                 return err
176         }
177         if len(vm.readables) == 0 {
178                 return fmt.Errorf("no volumes configured for %s", serviceURL)
179         }
180         h.volmgr = vm
181
182         // Initialize the pullq and workers
183         h.pullq = NewWorkQueue()
184         for i := 0; i < 1 || i < h.Cluster.Collections.BlobReplicateConcurrency; i++ {
185                 go h.runPullWorker(h.pullq)
186         }
187
188         // Initialize the trashq and workers
189         h.trashq = NewWorkQueue()
190         for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
191                 go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
192         }
193
194         // Set up routes and metrics
195         h.Handler = MakeRESTRouter(ctx, cluster, reg, vm, h.pullq, h.trashq)
196
197         // Initialize keepclient for pull workers
198         c, err := arvados.NewClientFromConfig(cluster)
199         if err != nil {
200                 return err
201         }
202         ac, err := arvadosclient.New(c)
203         if err != nil {
204                 return err
205         }
206         h.keepClient = &keepclient.KeepClient{
207                 Arvados:       ac,
208                 Want_replicas: 1,
209         }
210         h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())
211
212         if d := h.Cluster.Collections.BlobTrashCheckInterval.Duration(); d > 0 {
213                 go emptyTrash(h.volmgr.writables, d)
214         }
215
216         return nil
217 }