13647: Use cluster config instead of custom keepstore config.
[arvados.git] / lib / config / deprecated_keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package config
6
7 import (
8         "bufio"
9         "bytes"
10         "crypto/rand"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "math/big"
15         "net"
16         "os"
17         "strconv"
18         "strings"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/sirupsen/logrus"
22 )
23
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
25
26 type oldKeepstoreConfig struct {
27         Debug  *bool
28         Listen *string
29
30         LogFormat *string
31
32         PIDFile *string
33
34         MaxBuffers  *int
35         MaxRequests *int
36
37         BlobSignatureTTL    *arvados.Duration
38         BlobSigningKeyFile  *string
39         RequireSignatures   *bool
40         SystemAuthTokenFile *string
41         EnableDelete        *bool
42         TrashLifetime       *arvados.Duration
43         TrashCheckInterval  *arvados.Duration
44         PullWorkers         *int
45         TrashWorkers        *int
46         EmptyTrashWorkers   *int
47         TLSCertificateFile  *string
48         TLSKeyFile          *string
49
50         Volumes *oldKeepstoreVolumeList
51
52         ManagementToken *string
53
54         DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
55 }
56
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
58
59 type oldKeepstoreVolume struct {
60         arvados.Volume
61         Type string `json:",omitempty"`
62
63         // Azure driver configs
64         StorageAccountName    string           `json:",omitempty"`
65         StorageAccountKeyFile string           `json:",omitempty"`
66         StorageBaseURL        string           `json:",omitempty"`
67         ContainerName         string           `json:",omitempty"`
68         AzureReplication      int              `json:",omitempty"`
69         RequestTimeout        arvados.Duration `json:",omitempty"`
70         ListBlobsRetryDelay   arvados.Duration `json:",omitempty"`
71         ListBlobsMaxAttempts  int              `json:",omitempty"`
72
73         // S3 driver configs
74         AccessKeyFile      string           `json:",omitempty"`
75         SecretKeyFile      string           `json:",omitempty"`
76         Endpoint           string           `json:",omitempty"`
77         Region             string           `json:",omitempty"`
78         Bucket             string           `json:",omitempty"`
79         LocationConstraint bool             `json:",omitempty"`
80         IndexPageSize      int              `json:",omitempty"`
81         S3Replication      int              `json:",omitempty"`
82         ConnectTimeout     arvados.Duration `json:",omitempty"`
83         ReadTimeout        arvados.Duration `json:",omitempty"`
84         RaceWindow         arvados.Duration `json:",omitempty"`
85         UnsafeDelete       bool             `json:",omitempty"`
86
87         // Directory driver configs
88         Root                 string
89         DirectoryReplication int
90         Serialize            bool
91
92         // Common configs
93         ReadOnly       bool     `json:",omitempty"`
94         StorageClasses []string `json:",omitempty"`
95 }
96
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99         if ldr.KeepstorePath == "" {
100                 return nil
101         }
102         hostname, err := os.Hostname()
103         if err != nil {
104                 return fmt.Errorf("getting hostname: %s", err)
105         }
106
107         var oc oldKeepstoreConfig
108         err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109         if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
110                 return nil
111         } else if err != nil {
112                 return err
113         }
114
115         cluster, err := cfg.GetCluster("")
116         if err != nil {
117                 return err
118         }
119
120         myURL := arvados.URL{Scheme: "http"}
121         if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122                 myURL.Scheme = "https"
123         }
124
125         if v := oc.Debug; v == nil {
126         } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127                 cluster.SystemLogs.LogLevel = "debug"
128         } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129                 cluster.SystemLogs.LogLevel = "info"
130         }
131
132         if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
133                 cluster.TLS.Certificate = "file://" + *v
134         }
135         if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
136                 cluster.TLS.Key = "file://" + *v
137         }
138         if v := oc.Listen; v != nil {
139                 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
140                         // already listed
141                         myURL.Host = *v
142                 } else if len(*v) > 1 && (*v)[0] == ':' {
143                         myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144                         cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
145                 } else {
146                         return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
147                 }
148         } else {
149                 for url := range cluster.Services.Keepstore.InternalURLs {
150                         if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
151                                 myURL = url
152                                 break
153                         }
154                 }
155                 if myURL.Host == "" {
156                         return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
157                 }
158         }
159
160         if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
161                 cluster.SystemLogs.Format = *v
162         }
163         if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
164                 cluster.API.MaxKeepBlockBuffers = *v
165         }
166         if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
167                 cluster.API.MaxConcurrentRequests = *v
168         }
169         if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
170                 cluster.Collections.BlobSigningTTL = *v
171         }
172         if v := oc.BlobSigningKeyFile; v != nil {
173                 buf, err := ioutil.ReadFile(*v)
174                 if err != nil {
175                         return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
176                 }
177                 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178                         cluster.Collections.BlobSigningKey = key
179                 }
180         }
181         if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
182                 cluster.Collections.BlobSigning = *v
183         }
184         if v := oc.SystemAuthTokenFile; v != nil {
185                 f, err := os.Open(*v)
186                 if err != nil {
187                         return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
188                 }
189                 defer f.Close()
190                 buf, err := ioutil.ReadAll(f)
191                 if err != nil {
192                         return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
193                 }
194                 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195                         cluster.SystemRootToken = key
196                 }
197         }
198         if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
199                 cluster.Collections.BlobTrash = *v
200         }
201         if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
202                 cluster.Collections.BlobTrashLifetime = *v
203         }
204         if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
205                 cluster.Collections.BlobTrashCheckInterval = *v
206         }
207         if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
208                 cluster.Collections.BlobTrashConcurrency = *v
209         }
210         if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
211                 cluster.Collections.BlobDeleteConcurrency = *v
212         }
213         if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
214                 cluster.Collections.BlobReplicateConcurrency = *v
215         }
216         if v := oc.Volumes; v == nil {
217                 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218                 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
219                 if err != nil {
220                         return fmt.Errorf("error discovering local directory volumes: %s", err)
221                 }
222         } else {
223                 for i, oldvol := range *v {
224                         var accessViaHosts map[arvados.URL]arvados.VolumeAccess
225                         oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
226                         if found {
227                                 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
228                                 writers := false
229                                 for _, va := range accessViaHosts {
230                                         if !va.ReadOnly {
231                                                 writers = true
232                                         }
233                                 }
234                                 if writers || len(accessViaHosts) == 0 {
235                                         ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
236                                         if len(accessViaHosts) > 0 {
237                                                 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
238                                         }
239                                         continue
240                                 }
241                         }
242                         var newvol arvados.Volume
243                         if found {
244                                 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
245                                 newvol = cluster.Volumes[oldUUID]
246                                 // Remove the old entry. It will be
247                                 // added back below, possibly with a
248                                 // new UUID.
249                                 delete(cluster.Volumes, oldUUID)
250                         } else {
251                                 var params interface{}
252                                 switch oldvol.Type {
253                                 case "S3":
254                                         accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
255                                         if err != nil && oldvol.AccessKeyFile != "" {
256                                                 return fmt.Errorf("error reading AccessKeyFile: %s", err)
257                                         }
258                                         secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
259                                         if err != nil && oldvol.SecretKeyFile != "" {
260                                                 return fmt.Errorf("error reading SecretKeyFile: %s", err)
261                                         }
262                                         newvol = arvados.Volume{
263                                                 Driver:         "S3",
264                                                 ReadOnly:       oldvol.ReadOnly,
265                                                 Replication:    oldvol.S3Replication,
266                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
267                                         }
268                                         params = arvados.S3VolumeDriverParameters{
269                                                 AccessKey:          string(bytes.TrimSpace(accesskeydata)),
270                                                 SecretKey:          string(bytes.TrimSpace(secretkeydata)),
271                                                 Endpoint:           oldvol.Endpoint,
272                                                 Region:             oldvol.Region,
273                                                 Bucket:             oldvol.Bucket,
274                                                 LocationConstraint: oldvol.LocationConstraint,
275                                                 IndexPageSize:      oldvol.IndexPageSize,
276                                                 ConnectTimeout:     oldvol.ConnectTimeout,
277                                                 ReadTimeout:        oldvol.ReadTimeout,
278                                                 RaceWindow:         oldvol.RaceWindow,
279                                                 UnsafeDelete:       oldvol.UnsafeDelete,
280                                         }
281                                 case "Azure":
282                                         keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
283                                         if err != nil && oldvol.StorageAccountKeyFile != "" {
284                                                 return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
285                                         }
286                                         newvol = arvados.Volume{
287                                                 Driver:         "Azure",
288                                                 ReadOnly:       oldvol.ReadOnly,
289                                                 Replication:    oldvol.AzureReplication,
290                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
291                                         }
292                                         params = arvados.AzureVolumeDriverParameters{
293                                                 StorageAccountName:   oldvol.StorageAccountName,
294                                                 StorageAccountKey:    string(bytes.TrimSpace(keydata)),
295                                                 StorageBaseURL:       oldvol.StorageBaseURL,
296                                                 ContainerName:        oldvol.ContainerName,
297                                                 RequestTimeout:       oldvol.RequestTimeout,
298                                                 ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
299                                                 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
300                                         }
301                                 case "Directory":
302                                         newvol = arvados.Volume{
303                                                 Driver:         "Directory",
304                                                 ReadOnly:       oldvol.ReadOnly,
305                                                 Replication:    oldvol.DirectoryReplication,
306                                                 StorageClasses: array2boolmap(oldvol.StorageClasses),
307                                         }
308                                         params = arvados.DirectoryVolumeDriverParameters{
309                                                 Root:      oldvol.Root,
310                                                 Serialize: oldvol.Serialize,
311                                         }
312                                 default:
313                                         return fmt.Errorf("unsupported volume type %q", oldvol.Type)
314                                 }
315                                 dp, err := json.Marshal(params)
316                                 if err != nil {
317                                         return err
318                                 }
319                                 newvol.DriverParameters = json.RawMessage(dp)
320                                 if newvol.Replication < 1 {
321                                         newvol.Replication = 1
322                                 }
323                         }
324                         if accessViaHosts == nil {
325                                 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
326                         }
327                         accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
328                         newvol.AccessViaHosts = accessViaHosts
329
330                         volUUID := oldUUID
331                         if oldvol.ReadOnly {
332                         } else if oc.Listen == nil {
333                                 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
334                         } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
335                                 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
336                         } else if len(uuid) != 27 {
337                                 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
338                         } else {
339                                 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
340                                 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
341                                         ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
342                                 } else {
343                                         volUUID = rendezvousUUID
344                                 }
345                         }
346                         if volUUID == "" {
347                                 volUUID = newUUID(cluster.ClusterID, "nyw5e")
348                                 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
349                         }
350                         cluster.Volumes[volUUID] = newvol
351                 }
352         }
353
354         cfg.Clusters[cluster.ClusterID] = *cluster
355         return nil
356 }
357
358 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
359         for uuid, newvol := range newvols {
360                 if oldvol.Type != newvol.Driver {
361                         continue
362                 }
363                 switch oldvol.Type {
364                 case "S3":
365                         var params arvados.S3VolumeDriverParameters
366                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
367                                 oldvol.Endpoint == params.Endpoint &&
368                                 oldvol.Region == params.Region &&
369                                 oldvol.Bucket == params.Bucket &&
370                                 oldvol.LocationConstraint == params.LocationConstraint {
371                                 return uuid, true
372                         }
373                 case "Azure":
374                         var params arvados.AzureVolumeDriverParameters
375                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
376                                 oldvol.StorageAccountName == params.StorageAccountName &&
377                                 oldvol.StorageBaseURL == params.StorageBaseURL &&
378                                 oldvol.ContainerName == params.ContainerName {
379                                 return uuid, true
380                         }
381                 case "Directory":
382                         var params arvados.DirectoryVolumeDriverParameters
383                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
384                                 oldvol.Root == params.Root {
385                                 if _, ok := newvol.AccessViaHosts[myURL]; ok {
386                                         return uuid, true
387                                 }
388                         }
389                 }
390         }
391         return "", false
392 }
393
394 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
395         if mountsFile == "" {
396                 mountsFile = "/proc/mounts"
397         }
398         f, err := os.Open(mountsFile)
399         if err != nil {
400                 return fmt.Errorf("error opening %s: %s", mountsFile, err)
401         }
402         defer f.Close()
403         scanner := bufio.NewScanner(f)
404         for scanner.Scan() {
405                 args := strings.Fields(scanner.Text())
406                 dev, mount := args[0], args[1]
407                 if mount == "/" {
408                         continue
409                 }
410                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
411                         continue
412                 }
413                 keepdir := mount + "/keep"
414                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
415                         continue
416                 }
417
418                 ro := false
419                 for _, fsopt := range strings.Split(args[3], ",") {
420                         if fsopt == "ro" {
421                                 ro = true
422                         }
423                 }
424
425                 uuid := newUUID(cluster.ClusterID, "nyw5e")
426                 ldr.Logger.WithFields(logrus.Fields{
427                         "UUID":                       uuid,
428                         "Driver":                     "Directory",
429                         "DriverParameters.Root":      keepdir,
430                         "DriverParameters.Serialize": false,
431                         "ReadOnly":                   ro,
432                         "Replication":                1,
433                 }).Warn("adding local directory volume")
434
435                 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
436                         Root:      keepdir,
437                         Serialize: false,
438                 })
439                 if err != nil {
440                         panic(err)
441                 }
442                 cluster.Volumes[uuid] = arvados.Volume{
443                         Driver:           "Directory",
444                         DriverParameters: p,
445                         ReadOnly:         ro,
446                         Replication:      1,
447                         AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
448                                 myURL: {ReadOnly: ro},
449                         },
450                 }
451         }
452         if err := scanner.Err(); err != nil {
453                 return fmt.Errorf("reading %s: %s", mountsFile, err)
454         }
455         return nil
456 }
457
458 func array2boolmap(keys []string) map[string]bool {
459         m := map[string]bool{}
460         for _, k := range keys {
461                 m[k] = true
462         }
463         return m
464 }
465
466 func newUUID(clusterID, infix string) string {
467         randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
468         if err != nil {
469                 panic(err)
470         }
471         randstr := randint.Text(36)
472         for len(randstr) < 15 {
473                 randstr = "0" + randstr
474         }
475         return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
476 }
477
478 // Return the UUID and URL for the controller's keep_services listing
479 // corresponding to this host/process.
480 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
481         client, err := arvados.NewClientFromConfig(cluster)
482         if err != nil {
483                 return
484         }
485         client.AuthToken = cluster.SystemRootToken
486         var svcList arvados.KeepServiceList
487         err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
488         if err != nil {
489                 return
490         }
491         hostname, err := os.Hostname()
492         if err != nil {
493                 err = fmt.Errorf("error getting hostname: %s", err)
494                 return
495         }
496         var tried []string
497         for _, ks := range svcList.Items {
498                 if ks.ServiceType == "proxy" {
499                         continue
500                 } else if keepServiceIsMe(ks, hostname, listen) {
501                         url := arvados.URL{
502                                 Scheme: "http",
503                                 Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
504                         }
505                         if ks.ServiceSSLFlag {
506                                 url.Scheme = "https"
507                         }
508                         return ks.UUID, url, nil
509                 } else {
510                         tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
511                 }
512         }
513         err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
514         return
515 }
516
517 var localhostOrAllInterfaces = map[string]bool{
518         "localhost": true,
519         "127.0.0.1": true,
520         "::1":       true,
521         "::":        true,
522         "":          true,
523 }
524
525 // Return true if the given KeepService entry matches the given
526 // hostname and (keepstore config file) listen address.
527 //
528 // If the KeepService host is some variant of "localhost", we assume
529 // this is a testing or single-node environment, ignore the given
530 // hostname, and return true if the port numbers match.
531 //
532 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
533 // match a KeepService host "foo.bar", but also "foo.bar.example",
534 // "foo.bar.example.org", etc.
535 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
536         // Extract the port name/number from listen, and resolve it to
537         // a port number to compare with ks.ServicePort.
538         _, listenport, err := net.SplitHostPort(listen)
539         if err != nil && strings.HasPrefix(listen, ":") {
540                 listenport = listen[1:]
541         }
542         if lp, err := net.LookupPort("tcp", listenport); err != nil {
543                 return false
544         } else if !(lp == ks.ServicePort ||
545                 (lp == 0 && ks.ServicePort == 80)) {
546                 return false
547         }
548
549         kshost := strings.ToLower(ks.ServiceHost)
550         return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
551 }