13647: Use cluster config instead of custom keepstore config.
[arvados.git] / services / keepstore / command.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "math/rand"
14         "net/http"
15         "os"
16         "sync"
17
18         "git.curoverse.com/arvados.git/lib/config"
19         "git.curoverse.com/arvados.git/lib/service"
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
23         "git.curoverse.com/arvados.git/sdk/go/keepclient"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26 )
27
28 var (
29         version = "dev"
30         Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler)
31 )
32
33 func main() {
34         os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
35 }
36
37 func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
38         args, ok := convertKeepstoreFlagsToServiceFlags(args, ctxlog.FromContext(context.Background()))
39         if !ok {
40                 return 2
41         }
42         return Command.RunCommand(prog, args, stdin, stdout, stderr)
43 }
44
45 // Parse keepstore command line flags, and return equivalent
46 // service.Command flags. The second return value ("ok") is true if
47 // all provided flags were successfully converted.
48 func convertKeepstoreFlagsToServiceFlags(args []string, lgr logrus.FieldLogger) ([]string, bool) {
49         flags := flag.NewFlagSet("", flag.ContinueOnError)
50         flags.String("listen", "", "Services.Keepstore.InternalURLs")
51         flags.Int("max-buffers", 0, "API.MaxKeepBlockBuffers")
52         flags.Int("max-requests", 0, "API.MaxConcurrentRequests")
53         flags.Bool("never-delete", false, "Collections.BlobTrash")
54         flags.Bool("enforce-permissions", false, "Collections.BlobSigning")
55         flags.String("permission-key-file", "", "Collections.BlobSigningKey")
56         flags.String("blob-signing-key-file", "", "Collections.BlobSigningKey")
57         flags.String("data-manager-token-file", "", "SystemRootToken")
58         flags.Int("permission-ttl", 0, "Collections.BlobSigningTTL")
59         flags.Int("blob-signature-ttl", 0, "Collections.BlobSigningTTL")
60         flags.String("trash-lifetime", "", "Collections.BlobTrashLifetime")
61         flags.Bool("serialize", false, "Volumes.*.DriverParameters.Serialize")
62         flags.Bool("readonly", false, "Volumes.*.ReadOnly")
63         flags.String("pid", "", "-")
64         flags.String("trash-check-interval", "", "Collections.BlobTrashCheckInterval")
65
66         flags.String("azure-storage-container-volume", "", "Volumes.*.Driver")
67         flags.String("azure-storage-account-name", "", "Volumes.*.DriverParameters.StorageAccountName")
68         flags.String("azure-storage-account-key-file", "", "Volumes.*.DriverParameters.StorageAccountKey")
69         flags.String("azure-storage-replication", "", "Volumes.*.Replication")
70         flags.String("azure-max-get-bytes", "", "Volumes.*.DriverParameters.MaxDataReadSize")
71
72         flags.String("s3-bucket-volume", "", "Volumes.*.DriverParameters.Bucket")
73         flags.String("s3-region", "", "Volumes.*.DriverParameters.Region")
74         flags.String("s3-endpoint", "", "Volumes.*.DriverParameters.Endpoint")
75         flags.String("s3-access-key-file", "", "Volumes.*.DriverParameters.AccessKey")
76         flags.String("s3-secret-key-file", "", "Volumes.*.DriverParameters.SecretKey")
77         flags.String("s3-race-window", "", "Volumes.*.DriverParameters.RaceWindow")
78         flags.String("s3-replication", "", "Volumes.*.Replication")
79         flags.String("s3-unsafe-delete", "", "Volumes.*.DriverParameters.UnsafeDelete")
80
81         flags.String("volume", "", "Volumes")
82
83         flags.Bool("version", false, "")
84         flags.String("config", "", "")
85         flags.String("legacy-keepstore-config", "", "")
86
87         err := flags.Parse(args)
88         if err == flag.ErrHelp {
89                 return []string{"-help"}, true
90         } else if err != nil {
91                 return nil, false
92         }
93
94         args = nil
95         ok := true
96         flags.Visit(func(f *flag.Flag) {
97                 if f.Name == "config" || f.Name == "legacy-keepstore-config" || f.Name == "version" {
98                         args = append(args, "-"+f.Name, f.Value.String())
99                 } else if f.Usage == "-" {
100                         ok = false
101                         lgr.Errorf("command line flag -%s is no longer supported", f.Name)
102                 } else {
103                         ok = false
104                         lgr.Errorf("command line flag -%s is no longer supported -- use Clusters.*.%s in cluster config file instead", f.Name, f.Usage)
105                 }
106         })
107         if !ok {
108                 return nil, false
109         }
110
111         flags = flag.NewFlagSet("", flag.ExitOnError)
112         loader := config.NewLoader(nil, lgr)
113         loader.SetupFlags(flags)
114         return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true
115 }
116
117 type handler struct {
118         http.Handler
119         Cluster *arvados.Cluster
120         Logger  logrus.FieldLogger
121
122         pullq      *WorkQueue
123         trashq     *WorkQueue
124         volmgr     *RRVolumeManager
125         keepClient *keepclient.KeepClient
126
127         err       error
128         setupOnce sync.Once
129 }
130
131 func (h *handler) CheckHealth() error {
132         return h.err
133 }
134
135 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
136         var h handler
137         serviceURL, ok := service.URLFromContext(ctx)
138         if !ok {
139                 return service.ErrorHandler(ctx, cluster, errors.New("BUG: no URL from service.URLFromContext"))
140         }
141         err := h.setup(ctx, cluster, token, reg, serviceURL)
142         if err != nil {
143                 return service.ErrorHandler(ctx, cluster, err)
144         }
145         return &h
146 }
147
148 func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) error {
149         h.Cluster = cluster
150         h.Logger = ctxlog.FromContext(ctx)
151         if h.Cluster.API.MaxKeepBlockBuffers <= 0 {
152                 return fmt.Errorf("MaxBuffers must be greater than zero")
153         }
154         bufs = newBufferPool(h.Logger, h.Cluster.API.MaxKeepBlockBuffers, BlockSize)
155
156         if h.Cluster.API.MaxConcurrentRequests < 1 {
157                 h.Cluster.API.MaxConcurrentRequests = h.Cluster.API.MaxKeepBlockBuffers * 2
158                 h.Logger.Warnf("MaxRequests <1 or not specified; defaulting to MaxKeepBlockBuffers * 2 == %d", h.Cluster.API.MaxConcurrentRequests)
159         }
160
161         if h.Cluster.Collections.BlobSigningKey != "" {
162         } else if h.Cluster.Collections.BlobSigning {
163                 return errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
164         } else {
165                 h.Logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
166         }
167
168         if len(h.Cluster.Volumes) == 0 {
169                 return errors.New("no volumes configured")
170         }
171
172         h.Logger.Printf("keepstore %s starting, pid %d", version, os.Getpid())
173
174         // Start a round-robin VolumeManager with the configured volumes.
175         vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg))
176         if err != nil {
177                 return err
178         }
179         if len(vm.readables) == 0 {
180                 return fmt.Errorf("no volumes configured for %s", serviceURL)
181         }
182         h.volmgr = vm
183
184         // Initialize the pullq and workers
185         h.pullq = NewWorkQueue()
186         for i := 0; i < 1 || i < h.Cluster.Collections.BlobReplicateConcurrency; i++ {
187                 go h.runPullWorker(h.pullq)
188         }
189
190         // Initialize the trashq and workers
191         h.trashq = NewWorkQueue()
192         for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
193                 go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
194         }
195
196         // Set up routes and metrics
197         h.Handler = MakeRESTRouter(ctx, cluster, reg, vm, h.pullq, h.trashq)
198
199         // Initialize keepclient for pull workers
200         c, err := arvados.NewClientFromConfig(cluster)
201         if err != nil {
202                 return err
203         }
204         ac, err := arvadosclient.New(c)
205         if err != nil {
206                 return err
207         }
208         h.keepClient = &keepclient.KeepClient{
209                 Arvados:       ac,
210                 Want_replicas: 1,
211         }
212         h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())
213
214         if d := h.Cluster.Collections.BlobTrashCheckInterval.Duration(); d > 0 {
215                 go emptyTrash(h.volmgr.writables, d)
216         }
217
218         return nil
219 }