13647: Merge branch 'master' into 13647-keepstore-config
[arvados.git] / lib / config / deprecated_keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package config
6
7 import (
8         "bufio"
9         "bytes"
10         "crypto/rand"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "math/big"
15         "net"
16         "os"
17         "strconv"
18         "strings"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/sirupsen/logrus"
22 )
23
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
25
26 type oldKeepstoreConfig struct {
27         Debug  *bool
28         Listen *string
29
30         LogFormat *string
31
32         PIDFile *string
33
34         MaxBuffers  *int
35         MaxRequests *int
36
37         BlobSignatureTTL    *arvados.Duration
38         BlobSigningKeyFile  *string
39         RequireSignatures   *bool
40         SystemAuthTokenFile *string
41         EnableDelete        *bool
42         TrashLifetime       *arvados.Duration
43         TrashCheckInterval  *arvados.Duration
44         PullWorkers         *int
45         TrashWorkers        *int
46         EmptyTrashWorkers   *int
47         TLSCertificateFile  *string
48         TLSKeyFile          *string
49
50         Volumes *oldKeepstoreVolumeList
51
52         ManagementToken *string
53
54         DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
55 }
56
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
58
59 type oldKeepstoreVolume struct {
60         arvados.Volume
61         Type string `json:",omitempty"`
62
63         // Azure driver configs
64         StorageAccountName    string           `json:",omitempty"`
65         StorageAccountKeyFile string           `json:",omitempty"`
66         StorageBaseURL        string           `json:",omitempty"`
67         ContainerName         string           `json:",omitempty"`
68         AzureReplication      int              `json:",omitempty"`
69         RequestTimeout        arvados.Duration `json:",omitempty"`
70         ListBlobsRetryDelay   arvados.Duration `json:",omitempty"`
71         ListBlobsMaxAttempts  int              `json:",omitempty"`
72
73         // S3 driver configs
74         AccessKeyFile      string           `json:",omitempty"`
75         SecretKeyFile      string           `json:",omitempty"`
76         Endpoint           string           `json:",omitempty"`
77         Region             string           `json:",omitempty"`
78         Bucket             string           `json:",omitempty"`
79         LocationConstraint bool             `json:",omitempty"`
80         IndexPageSize      int              `json:",omitempty"`
81         S3Replication      int              `json:",omitempty"`
82         ConnectTimeout     arvados.Duration `json:",omitempty"`
83         ReadTimeout        arvados.Duration `json:",omitempty"`
84         RaceWindow         arvados.Duration `json:",omitempty"`
85         UnsafeDelete       bool             `json:",omitempty"`
86
87         // Directory driver configs
88         Root                 string
89         DirectoryReplication int
90         Serialize            bool
91
92         // Common configs
93         ReadOnly       bool     `json:",omitempty"`
94         StorageClasses []string `json:",omitempty"`
95 }
96
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99         if ldr.KeepstorePath == "" {
100                 return nil
101         }
102         hostname, err := os.Hostname()
103         if err != nil {
104                 return fmt.Errorf("getting hostname: %s", err)
105         }
106
107         var oc oldKeepstoreConfig
108         err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109         if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
110                 return nil
111         } else if err != nil {
112                 return err
113         }
114
115         cluster, err := cfg.GetCluster("")
116         if err != nil {
117                 return err
118         }
119
120         myURL := arvados.URL{Scheme: "http"}
121         if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122                 myURL.Scheme = "https"
123         }
124
125         if v := oc.Debug; v == nil {
126         } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127                 cluster.SystemLogs.LogLevel = "debug"
128         } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129                 cluster.SystemLogs.LogLevel = "info"
130         }
131
132         if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
133                 cluster.TLS.Certificate = "file://" + *v
134         }
135         if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
136                 cluster.TLS.Key = "file://" + *v
137         }
138         if v := oc.Listen; v != nil {
139                 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
140                         // already listed
141                         myURL.Host = *v
142                 } else if len(*v) > 1 && (*v)[0] == ':' {
143                         myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144                         cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
145                 } else {
146                         return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
147                 }
148         } else {
149                 for url := range cluster.Services.Keepstore.InternalURLs {
150                         if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
151                                 myURL = url
152                                 break
153                         }
154                 }
155                 if myURL.Host == "" {
156                         return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
157                 }
158         }
159
160         if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
161                 cluster.SystemLogs.Format = *v
162         }
163         if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
164                 cluster.API.MaxKeepBlockBuffers = *v
165         }
166         if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
167                 cluster.API.MaxConcurrentRequests = *v
168         }
169         if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
170                 cluster.Collections.BlobSigningTTL = *v
171         }
172         if v := oc.BlobSigningKeyFile; v != nil {
173                 buf, err := ioutil.ReadFile(*v)
174                 if err != nil {
175                         return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
176                 }
177                 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178                         cluster.Collections.BlobSigningKey = key
179                 }
180         }
181         if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
182                 cluster.Collections.BlobSigning = *v
183         }
184         if v := oc.SystemAuthTokenFile; v != nil {
185                 f, err := os.Open(*v)
186                 if err != nil {
187                         return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
188                 }
189                 defer f.Close()
190                 buf, err := ioutil.ReadAll(f)
191                 if err != nil {
192                         return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
193                 }
194                 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195                         cluster.SystemRootToken = key
196                 }
197         }
198         if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
199                 cluster.Collections.BlobTrash = *v
200         }
201         if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
202                 cluster.Collections.BlobTrashLifetime = *v
203         }
204         if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
205                 cluster.Collections.BlobTrashCheckInterval = *v
206         }
207         if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
208                 cluster.Collections.BlobTrashConcurrency = *v
209         }
210         if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
211                 cluster.Collections.BlobDeleteConcurrency = *v
212         }
213         if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
214                 cluster.Collections.BlobReplicateConcurrency = *v
215         }
216         if v := oc.Volumes; v == nil {
217                 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218                 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
219                 if err != nil {
220                         return fmt.Errorf("error discovering local directory volumes: %s", err)
221                 }
222         } else {
223                 for i, oldvol := range *v {
224                         var accessViaHosts map[arvados.URL]arvados.VolumeAccess
225                         oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
226                         if found {
227                                 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
228                                 writers := false
229                                 for _, va := range accessViaHosts {
230                                         if !va.ReadOnly {
231                                                 writers = true
232                                         }
233                                 }
234                                 if writers || len(accessViaHosts) == 0 {
235                                         ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
236                                         if len(accessViaHosts) > 0 {
237                                                 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
238                                         }
239                                         continue
240                                 }
241                         }
242                         var newvol arvados.Volume
243                         if found {
244                                 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
245                                 newvol = cluster.Volumes[oldUUID]
246                                 // Remove the old entry. It will be
247                                 // added back below, possibly with a
248                                 // new UUID.
249                                 delete(cluster.Volumes, oldUUID)
250                         } else {
251                                 var params interface{}
252                                 switch oldvol.Type {
253                                 case "S3":
254                                         accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
255                                         if err != nil && oldvol.AccessKeyFile != "" {
256                                                 return fmt.Errorf("error reading AccessKeyFile: %s", err)
257                                         }
258                                         secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
259                                         if err != nil && oldvol.SecretKeyFile != "" {
260                                                 return fmt.Errorf("error reading SecretKeyFile: %s", err)
261                                         }
262                                         newvol = arvados.Volume{
263                                                 Driver:         "S3",
264                                                 ReadOnly:       oldvol.ReadOnly,
265                                                 Replication:    oldvol.S3Replication,
266                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
267                                         }
268                                         params = arvados.S3VolumeDriverParameters{
269                                                 AccessKey:          string(bytes.TrimSpace(accesskeydata)),
270                                                 SecretKey:          string(bytes.TrimSpace(secretkeydata)),
271                                                 Endpoint:           oldvol.Endpoint,
272                                                 Region:             oldvol.Region,
273                                                 Bucket:             oldvol.Bucket,
274                                                 LocationConstraint: oldvol.LocationConstraint,
275                                                 IndexPageSize:      oldvol.IndexPageSize,
276                                                 ConnectTimeout:     oldvol.ConnectTimeout,
277                                                 ReadTimeout:        oldvol.ReadTimeout,
278                                                 RaceWindow:         oldvol.RaceWindow,
279                                                 UnsafeDelete:       oldvol.UnsafeDelete,
280                                         }
281                                 case "Azure":
282                                         keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
283                                         if err != nil && oldvol.StorageAccountKeyFile != "" {
284                                                 return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
285                                         }
286                                         newvol = arvados.Volume{
287                                                 Driver:         "Azure",
288                                                 ReadOnly:       oldvol.ReadOnly,
289                                                 Replication:    oldvol.AzureReplication,
290                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
291                                         }
292                                         params = arvados.AzureVolumeDriverParameters{
293                                                 StorageAccountName:   oldvol.StorageAccountName,
294                                                 StorageAccountKey:    string(bytes.TrimSpace(keydata)),
295                                                 StorageBaseURL:       oldvol.StorageBaseURL,
296                                                 ContainerName:        oldvol.ContainerName,
297                                                 RequestTimeout:       oldvol.RequestTimeout,
298                                                 ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
299                                                 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
300                                         }
301                                 case "Directory":
302                                         newvol = arvados.Volume{
303                                                 Driver:         "Directory",
304                                                 ReadOnly:       oldvol.ReadOnly,
305                                                 Replication:    oldvol.DirectoryReplication,
306                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
307                                         }
308                                         params = arvados.DirectoryVolumeDriverParameters{
309                                                 Root:      oldvol.Root,
310                                                 Serialize: oldvol.Serialize,
311                                         }
312                                 default:
313                                         return fmt.Errorf("unsupported volume type %q", oldvol.Type)
314                                 }
315                                 dp, err := json.Marshal(params)
316                                 if err != nil {
317                                         return err
318                                 }
319                                 newvol.DriverParameters = json.RawMessage(dp)
320                                 if newvol.Replication < 1 {
321                                         newvol.Replication = 1
322                                 }
323                         }
324                         if accessViaHosts == nil {
325                                 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
326                         }
327                         accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
328                         newvol.AccessViaHosts = accessViaHosts
329
330                         volUUID := oldUUID
331                         if oldvol.ReadOnly {
332                         } else if oc.Listen == nil {
333                                 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
334                         } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
335                                 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
336                         } else if len(uuid) != 27 {
337                                 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
338                         } else {
339                                 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
340                                 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
341                                         ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
342                                 } else {
343                                         volUUID = rendezvousUUID
344                                 }
345                                 si := cluster.Services.Keepstore.InternalURLs[myURL]
346                                 si.Rendezvous = uuid[12:]
347                                 cluster.Services.Keepstore.InternalURLs[myURL] = si
348                         }
349                         if volUUID == "" {
350                                 volUUID = newUUID(cluster.ClusterID, "nyw5e")
351                                 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
352                         }
353                         cluster.Volumes[volUUID] = newvol
354                 }
355         }
356
357         cfg.Clusters[cluster.ClusterID] = *cluster
358         return nil
359 }
360
361 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
362         for uuid, newvol := range newvols {
363                 if oldvol.Type != newvol.Driver {
364                         continue
365                 }
366                 switch oldvol.Type {
367                 case "S3":
368                         var params arvados.S3VolumeDriverParameters
369                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
370                                 oldvol.Endpoint == params.Endpoint &&
371                                 oldvol.Region == params.Region &&
372                                 oldvol.Bucket == params.Bucket &&
373                                 oldvol.LocationConstraint == params.LocationConstraint {
374                                 return uuid, true
375                         }
376                 case "Azure":
377                         var params arvados.AzureVolumeDriverParameters
378                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
379                                 oldvol.StorageAccountName == params.StorageAccountName &&
380                                 oldvol.StorageBaseURL == params.StorageBaseURL &&
381                                 oldvol.ContainerName == params.ContainerName {
382                                 return uuid, true
383                         }
384                 case "Directory":
385                         var params arvados.DirectoryVolumeDriverParameters
386                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
387                                 oldvol.Root == params.Root {
388                                 if _, ok := newvol.AccessViaHosts[myURL]; ok {
389                                         return uuid, true
390                                 }
391                         }
392                 }
393         }
394         return "", false
395 }
396
397 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
398         if mountsFile == "" {
399                 mountsFile = "/proc/mounts"
400         }
401         f, err := os.Open(mountsFile)
402         if err != nil {
403                 return fmt.Errorf("error opening %s: %s", mountsFile, err)
404         }
405         defer f.Close()
406         scanner := bufio.NewScanner(f)
407         for scanner.Scan() {
408                 args := strings.Fields(scanner.Text())
409                 dev, mount := args[0], args[1]
410                 if mount == "/" {
411                         continue
412                 }
413                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
414                         continue
415                 }
416                 keepdir := mount + "/keep"
417                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
418                         continue
419                 }
420
421                 ro := false
422                 for _, fsopt := range strings.Split(args[3], ",") {
423                         if fsopt == "ro" {
424                                 ro = true
425                         }
426                 }
427
428                 uuid := newUUID(cluster.ClusterID, "nyw5e")
429                 ldr.Logger.WithFields(logrus.Fields{
430                         "UUID":                       uuid,
431                         "Driver":                     "Directory",
432                         "DriverParameters.Root":      keepdir,
433                         "DriverParameters.Serialize": false,
434                         "ReadOnly":                   ro,
435                         "Replication":                1,
436                 }).Warn("adding local directory volume")
437
438                 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
439                         Root:      keepdir,
440                         Serialize: false,
441                 })
442                 if err != nil {
443                         panic(err)
444                 }
445                 cluster.Volumes[uuid] = arvados.Volume{
446                         Driver:           "Directory",
447                         DriverParameters: p,
448                         ReadOnly:         ro,
449                         Replication:      1,
450                         AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
451                                 myURL: {ReadOnly: ro},
452                         },
453                 }
454         }
455         if err := scanner.Err(); err != nil {
456                 return fmt.Errorf("reading %s: %s", mountsFile, err)
457         }
458         return nil
459 }
460
461 func array2boolmap(keys []string) map[string]bool {
462         m := map[string]bool{}
463         for _, k := range keys {
464                 m[k] = true
465         }
466         return m
467 }
468
469 func newUUID(clusterID, infix string) string {
470         randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
471         if err != nil {
472                 panic(err)
473         }
474         randstr := randint.Text(36)
475         for len(randstr) < 15 {
476                 randstr = "0" + randstr
477         }
478         return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
479 }
480
481 // Return the UUID and URL for the controller's keep_services listing
482 // corresponding to this host/process.
483 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
484         client, err := arvados.NewClientFromConfig(cluster)
485         if err != nil {
486                 return
487         }
488         client.AuthToken = cluster.SystemRootToken
489         var svcList arvados.KeepServiceList
490         err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
491         if err != nil {
492                 return
493         }
494         hostname, err := os.Hostname()
495         if err != nil {
496                 err = fmt.Errorf("error getting hostname: %s", err)
497                 return
498         }
499         var tried []string
500         for _, ks := range svcList.Items {
501                 if ks.ServiceType == "proxy" {
502                         continue
503                 } else if keepServiceIsMe(ks, hostname, listen) {
504                         url := arvados.URL{
505                                 Scheme: "http",
506                                 Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
507                         }
508                         if ks.ServiceSSLFlag {
509                                 url.Scheme = "https"
510                         }
511                         return ks.UUID, url, nil
512                 } else {
513                         tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
514                 }
515         }
516         err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
517         return
518 }
519
520 var localhostOrAllInterfaces = map[string]bool{
521         "localhost": true,
522         "127.0.0.1": true,
523         "::1":       true,
524         "::":        true,
525         "":          true,
526 }
527
528 // Return true if the given KeepService entry matches the given
529 // hostname and (keepstore config file) listen address.
530 //
531 // If the KeepService host is some variant of "localhost", we assume
532 // this is a testing or single-node environment, ignore the given
533 // hostname, and return true if the port numbers match.
534 //
535 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
536 // match a KeepService host "foo.bar", but also "foo.bar.example",
537 // "foo.bar.example.org", etc.
538 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
539         // Extract the port name/number from listen, and resolve it to
540         // a port number to compare with ks.ServicePort.
541         _, listenport, err := net.SplitHostPort(listen)
542         if err != nil && strings.HasPrefix(listen, ":") {
543                 listenport = listen[1:]
544         }
545         if lp, err := net.LookupPort("tcp", listenport); err != nil {
546                 return false
547         } else if !(lp == ks.ServicePort ||
548                 (lp == 0 && ks.ServicePort == 80)) {
549                 return false
550         }
551
552         kshost := strings.ToLower(ks.ServiceHost)
553         return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
554 }