13647: Remove superfluous conditionals.
[arvados.git] / lib / config / deprecated_keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package config
6
7 import (
8         "bufio"
9         "bytes"
10         "crypto/rand"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "math/big"
15         "net"
16         "os"
17         "strconv"
18         "strings"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/sirupsen/logrus"
22 )
23
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
25
26 type oldKeepstoreConfig struct {
27         Debug  *bool
28         Listen *string
29
30         LogFormat *string
31
32         PIDFile *string
33
34         MaxBuffers  *int
35         MaxRequests *int
36
37         BlobSignatureTTL    *arvados.Duration
38         BlobSigningKeyFile  *string
39         RequireSignatures   *bool
40         SystemAuthTokenFile *string
41         EnableDelete        *bool
42         TrashLifetime       *arvados.Duration
43         TrashCheckInterval  *arvados.Duration
44         PullWorkers         *int
45         TrashWorkers        *int
46         EmptyTrashWorkers   *int
47         TLSCertificateFile  *string
48         TLSKeyFile          *string
49
50         Volumes *oldKeepstoreVolumeList
51
52         ManagementToken *string
53
54         DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
55 }
56
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
58
59 type oldKeepstoreVolume struct {
60         arvados.Volume
61         Type string `json:",omitempty"`
62
63         // Azure driver configs
64         StorageAccountName    string           `json:",omitempty"`
65         StorageAccountKeyFile string           `json:",omitempty"`
66         StorageBaseURL        string           `json:",omitempty"`
67         ContainerName         string           `json:",omitempty"`
68         AzureReplication      int              `json:",omitempty"`
69         RequestTimeout        arvados.Duration `json:",omitempty"`
70         ListBlobsRetryDelay   arvados.Duration `json:",omitempty"`
71         ListBlobsMaxAttempts  int              `json:",omitempty"`
72
73         // S3 driver configs
74         AccessKeyFile      string           `json:",omitempty"`
75         SecretKeyFile      string           `json:",omitempty"`
76         Endpoint           string           `json:",omitempty"`
77         Region             string           `json:",omitempty"`
78         Bucket             string           `json:",omitempty"`
79         LocationConstraint bool             `json:",omitempty"`
80         IndexPageSize      int              `json:",omitempty"`
81         S3Replication      int              `json:",omitempty"`
82         ConnectTimeout     arvados.Duration `json:",omitempty"`
83         ReadTimeout        arvados.Duration `json:",omitempty"`
84         RaceWindow         arvados.Duration `json:",omitempty"`
85         UnsafeDelete       bool             `json:",omitempty"`
86
87         // Directory driver configs
88         Root                 string
89         DirectoryReplication int
90         Serialize            bool
91
92         // Common configs
93         ReadOnly       bool     `json:",omitempty"`
94         StorageClasses []string `json:",omitempty"`
95 }
96
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99         if ldr.KeepstorePath == "" {
100                 return nil
101         }
102         hostname, err := os.Hostname()
103         if err != nil {
104                 return fmt.Errorf("getting hostname: %s", err)
105         }
106
107         var oc oldKeepstoreConfig
108         err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109         if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
110                 return nil
111         } else if err != nil {
112                 return err
113         }
114
115         cluster, err := cfg.GetCluster("")
116         if err != nil {
117                 return err
118         }
119
120         myURL := arvados.URL{Scheme: "http"}
121         if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122                 myURL.Scheme = "https"
123         }
124
125         if v := oc.Debug; v == nil {
126         } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127                 cluster.SystemLogs.LogLevel = "debug"
128         } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129                 cluster.SystemLogs.LogLevel = "info"
130         }
131
132         if v := oc.TLSCertificateFile; v != nil {
133                 cluster.TLS.Certificate = "file://" + *v
134         }
135         if v := oc.TLSKeyFile; v != nil {
136                 cluster.TLS.Key = "file://" + *v
137         }
138         if v := oc.Listen; v != nil {
139                 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
140                         // already listed
141                         myURL.Host = *v
142                 } else if len(*v) > 1 && (*v)[0] == ':' {
143                         myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144                         cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
145                 } else {
146                         return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
147                 }
148         } else {
149                 for url := range cluster.Services.Keepstore.InternalURLs {
150                         if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
151                                 myURL = url
152                                 break
153                         }
154                 }
155                 if myURL.Host == "" {
156                         return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
157                 }
158         }
159
160         if v := oc.LogFormat; v != nil {
161                 cluster.SystemLogs.Format = *v
162         }
163         if v := oc.MaxBuffers; v != nil {
164                 cluster.API.MaxKeepBlobBuffers = *v
165         }
166         if v := oc.MaxRequests; v != nil {
167                 cluster.API.MaxConcurrentRequests = *v
168         }
169         if v := oc.BlobSignatureTTL; v != nil {
170                 cluster.Collections.BlobSigningTTL = *v
171         }
172         if v := oc.BlobSigningKeyFile; v != nil {
173                 buf, err := ioutil.ReadFile(*v)
174                 if err != nil {
175                         return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
176                 }
177                 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178                         cluster.Collections.BlobSigningKey = key
179                 }
180         }
181         if v := oc.RequireSignatures; v != nil {
182                 cluster.Collections.BlobSigning = *v
183         }
184         if v := oc.SystemAuthTokenFile; v != nil {
185                 f, err := os.Open(*v)
186                 if err != nil {
187                         return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
188                 }
189                 defer f.Close()
190                 buf, err := ioutil.ReadAll(f)
191                 if err != nil {
192                         return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
193                 }
194                 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195                         cluster.SystemRootToken = key
196                 }
197         }
198         if v := oc.EnableDelete; v != nil {
199                 cluster.Collections.BlobTrash = *v
200         }
201         if v := oc.TrashLifetime; v != nil {
202                 cluster.Collections.BlobTrashLifetime = *v
203         }
204         if v := oc.TrashCheckInterval; v != nil {
205                 cluster.Collections.BlobTrashCheckInterval = *v
206         }
207         if v := oc.TrashWorkers; v != nil {
208                 cluster.Collections.BlobTrashConcurrency = *v
209         }
210         if v := oc.EmptyTrashWorkers; v != nil {
211                 cluster.Collections.BlobDeleteConcurrency = *v
212         }
213         if v := oc.PullWorkers; v != nil {
214                 cluster.Collections.BlobReplicateConcurrency = *v
215         }
216         if oc.Volumes == nil || len(*oc.Volumes) == 0 {
217                 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218                 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
219                 if err != nil {
220                         return fmt.Errorf("error discovering local directory volumes: %s", err)
221                 }
222         } else {
223                 err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
224                 if err != nil {
225                         return err
226                 }
227         }
228
229         cfg.Clusters[cluster.ClusterID] = *cluster
230         return nil
231 }
232
233 // Merge Volumes section of old keepstore config into cluster config.
234 func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
235         for i, oldvol := range *oc.Volumes {
236                 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
237                 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
238                 if found {
239                         accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
240                         writers := false
241                         for _, va := range accessViaHosts {
242                                 if !va.ReadOnly {
243                                         writers = true
244                                 }
245                         }
246                         if writers || len(accessViaHosts) == 0 {
247                                 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
248                                 if len(accessViaHosts) > 0 {
249                                         cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
250                                 }
251                                 continue
252                         }
253                 }
254                 var newvol arvados.Volume
255                 if found {
256                         ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
257                         newvol = cluster.Volumes[oldUUID]
258                         // Remove the old entry. It will be added back
259                         // below, possibly with a new UUID.
260                         delete(cluster.Volumes, oldUUID)
261                 } else {
262                         v, err := ldr.translateOldKeepstoreVolume(oldvol)
263                         if err != nil {
264                                 return err
265                         }
266                         newvol = v
267                 }
268                 if accessViaHosts == nil {
269                         accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
270                 }
271                 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
272                 newvol.AccessViaHosts = accessViaHosts
273
274                 volUUID := oldUUID
275                 if oldvol.ReadOnly {
276                 } else if oc.Listen == nil {
277                         ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
278                 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
279                         ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
280                 } else if len(uuid) != 27 {
281                         ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
282                 } else {
283                         rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
284                         if _, ok := cluster.Volumes[rendezvousUUID]; ok {
285                                 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
286                         } else {
287                                 volUUID = rendezvousUUID
288                         }
289                         si := cluster.Services.Keepstore.InternalURLs[myURL]
290                         si.Rendezvous = uuid[12:]
291                         cluster.Services.Keepstore.InternalURLs[myURL] = si
292                 }
293                 if volUUID == "" {
294                         volUUID = newUUID(cluster.ClusterID, "nyw5e")
295                         ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
296                 }
297                 cluster.Volumes[volUUID] = newvol
298         }
299         return nil
300 }
301
302 func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
303         var newvol arvados.Volume
304         var params interface{}
305         switch oldvol.Type {
306         case "S3":
307                 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
308                 if err != nil && oldvol.AccessKeyFile != "" {
309                         return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
310                 }
311                 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
312                 if err != nil && oldvol.SecretKeyFile != "" {
313                         return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
314                 }
315                 newvol = arvados.Volume{
316                         Driver:         "S3",
317                         ReadOnly:       oldvol.ReadOnly,
318                         Replication:    oldvol.S3Replication,
319                         StorageClasses: array2boolmap(oldvol.StorageClasses),
320                 }
321                 params = arvados.S3VolumeDriverParameters{
322                         AccessKey:          string(bytes.TrimSpace(accesskeydata)),
323                         SecretKey:          string(bytes.TrimSpace(secretkeydata)),
324                         Endpoint:           oldvol.Endpoint,
325                         Region:             oldvol.Region,
326                         Bucket:             oldvol.Bucket,
327                         LocationConstraint: oldvol.LocationConstraint,
328                         IndexPageSize:      oldvol.IndexPageSize,
329                         ConnectTimeout:     oldvol.ConnectTimeout,
330                         ReadTimeout:        oldvol.ReadTimeout,
331                         RaceWindow:         oldvol.RaceWindow,
332                         UnsafeDelete:       oldvol.UnsafeDelete,
333                 }
334         case "Azure":
335                 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
336                 if err != nil && oldvol.StorageAccountKeyFile != "" {
337                         return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
338                 }
339                 newvol = arvados.Volume{
340                         Driver:         "Azure",
341                         ReadOnly:       oldvol.ReadOnly,
342                         Replication:    oldvol.AzureReplication,
343                         StorageClasses: array2boolmap(oldvol.StorageClasses),
344                 }
345                 params = arvados.AzureVolumeDriverParameters{
346                         StorageAccountName:   oldvol.StorageAccountName,
347                         StorageAccountKey:    string(bytes.TrimSpace(keydata)),
348                         StorageBaseURL:       oldvol.StorageBaseURL,
349                         ContainerName:        oldvol.ContainerName,
350                         RequestTimeout:       oldvol.RequestTimeout,
351                         ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
352                         ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
353                 }
354         case "Directory":
355                 newvol = arvados.Volume{
356                         Driver:         "Directory",
357                         ReadOnly:       oldvol.ReadOnly,
358                         Replication:    oldvol.DirectoryReplication,
359                         StorageClasses: array2boolmap(oldvol.StorageClasses),
360                 }
361                 params = arvados.DirectoryVolumeDriverParameters{
362                         Root:      oldvol.Root,
363                         Serialize: oldvol.Serialize,
364                 }
365         default:
366                 return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
367         }
368         dp, err := json.Marshal(params)
369         if err != nil {
370                 return newvol, err
371         }
372         newvol.DriverParameters = json.RawMessage(dp)
373         if newvol.Replication < 1 {
374                 newvol.Replication = 1
375         }
376         return newvol, nil
377 }
378
379 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
380         for uuid, newvol := range newvols {
381                 if oldvol.Type != newvol.Driver {
382                         continue
383                 }
384                 switch oldvol.Type {
385                 case "S3":
386                         var params arvados.S3VolumeDriverParameters
387                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
388                                 oldvol.Endpoint == params.Endpoint &&
389                                 oldvol.Region == params.Region &&
390                                 oldvol.Bucket == params.Bucket &&
391                                 oldvol.LocationConstraint == params.LocationConstraint {
392                                 return uuid, true
393                         }
394                 case "Azure":
395                         var params arvados.AzureVolumeDriverParameters
396                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
397                                 oldvol.StorageAccountName == params.StorageAccountName &&
398                                 oldvol.StorageBaseURL == params.StorageBaseURL &&
399                                 oldvol.ContainerName == params.ContainerName {
400                                 return uuid, true
401                         }
402                 case "Directory":
403                         var params arvados.DirectoryVolumeDriverParameters
404                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
405                                 oldvol.Root == params.Root {
406                                 if _, ok := newvol.AccessViaHosts[myURL]; ok {
407                                         return uuid, true
408                                 }
409                         }
410                 }
411         }
412         return "", false
413 }
414
415 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
416         if mountsFile == "" {
417                 mountsFile = "/proc/mounts"
418         }
419         f, err := os.Open(mountsFile)
420         if err != nil {
421                 return fmt.Errorf("error opening %s: %s", mountsFile, err)
422         }
423         defer f.Close()
424         scanner := bufio.NewScanner(f)
425         for scanner.Scan() {
426                 args := strings.Fields(scanner.Text())
427                 dev, mount := args[0], args[1]
428                 if mount == "/" {
429                         continue
430                 }
431                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
432                         continue
433                 }
434                 keepdir := mount + "/keep"
435                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
436                         continue
437                 }
438
439                 ro := false
440                 for _, fsopt := range strings.Split(args[3], ",") {
441                         if fsopt == "ro" {
442                                 ro = true
443                         }
444                 }
445
446                 uuid := newUUID(cluster.ClusterID, "nyw5e")
447                 ldr.Logger.WithFields(logrus.Fields{
448                         "UUID":                       uuid,
449                         "Driver":                     "Directory",
450                         "DriverParameters.Root":      keepdir,
451                         "DriverParameters.Serialize": false,
452                         "ReadOnly":                   ro,
453                         "Replication":                1,
454                 }).Warn("adding local directory volume")
455
456                 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
457                         Root:      keepdir,
458                         Serialize: false,
459                 })
460                 if err != nil {
461                         panic(err)
462                 }
463                 cluster.Volumes[uuid] = arvados.Volume{
464                         Driver:           "Directory",
465                         DriverParameters: p,
466                         ReadOnly:         ro,
467                         Replication:      1,
468                         AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
469                                 myURL: {ReadOnly: ro},
470                         },
471                 }
472         }
473         if err := scanner.Err(); err != nil {
474                 return fmt.Errorf("reading %s: %s", mountsFile, err)
475         }
476         return nil
477 }
478
479 func array2boolmap(keys []string) map[string]bool {
480         m := map[string]bool{}
481         for _, k := range keys {
482                 m[k] = true
483         }
484         return m
485 }
486
487 func newUUID(clusterID, infix string) string {
488         randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
489         if err != nil {
490                 panic(err)
491         }
492         randstr := randint.Text(36)
493         for len(randstr) < 15 {
494                 randstr = "0" + randstr
495         }
496         return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
497 }
498
499 // Return the UUID and URL for the controller's keep_services listing
500 // corresponding to this host/process.
501 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
502         client, err := arvados.NewClientFromConfig(cluster)
503         if err != nil {
504                 return
505         }
506         client.AuthToken = cluster.SystemRootToken
507         var svcList arvados.KeepServiceList
508         err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
509         if err != nil {
510                 return
511         }
512         hostname, err := os.Hostname()
513         if err != nil {
514                 err = fmt.Errorf("error getting hostname: %s", err)
515                 return
516         }
517         var tried []string
518         for _, ks := range svcList.Items {
519                 if ks.ServiceType == "proxy" {
520                         continue
521                 } else if keepServiceIsMe(ks, hostname, listen) {
522                         url := arvados.URL{
523                                 Scheme: "http",
524                                 Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
525                         }
526                         if ks.ServiceSSLFlag {
527                                 url.Scheme = "https"
528                         }
529                         return ks.UUID, url, nil
530                 } else {
531                         tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
532                 }
533         }
534         err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
535         return
536 }
537
538 var localhostOrAllInterfaces = map[string]bool{
539         "localhost": true,
540         "127.0.0.1": true,
541         "::1":       true,
542         "::":        true,
543         "":          true,
544 }
545
546 // Return true if the given KeepService entry matches the given
547 // hostname and (keepstore config file) listen address.
548 //
549 // If the KeepService host is some variant of "localhost", we assume
550 // this is a testing or single-node environment, ignore the given
551 // hostname, and return true if the port numbers match.
552 //
553 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
554 // match a KeepService host "foo.bar", but also "foo.bar.example",
555 // "foo.bar.example.org", etc.
556 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
557         // Extract the port name/number from listen, and resolve it to
558         // a port number to compare with ks.ServicePort.
559         _, listenport, err := net.SplitHostPort(listen)
560         if err != nil && strings.HasPrefix(listen, ":") {
561                 listenport = listen[1:]
562         }
563         if lp, err := net.LookupPort("tcp", listenport); err != nil {
564                 return false
565         } else if !(lp == ks.ServicePort ||
566                 (lp == 0 && ks.ServicePort == 80)) {
567                 return false
568         }
569
570         kshost := strings.ToLower(ks.ServiceHost)
571         return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
572 }