Merge branch '22132-scheduling-status-log' refs #22132
[arvados.git] / lib / config / deprecated_keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package config
6
7 import (
8         "bufio"
9         "bytes"
10         "crypto/rand"
11         "encoding/json"
12         "fmt"
13         "io/ioutil"
14         "math/big"
15         "net"
16         "os"
17         "strconv"
18         "strings"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/sirupsen/logrus"
23 )
24
25 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
26
27 type oldKeepstoreConfig struct {
28         Debug  *bool
29         Listen *string
30
31         LogFormat *string
32
33         PIDFile *string
34
35         MaxBuffers  *int
36         MaxRequests *int
37
38         BlobSignatureTTL    *arvados.Duration
39         BlobSigningKeyFile  *string
40         RequireSignatures   *bool
41         SystemAuthTokenFile *string
42         EnableDelete        *bool
43         TrashLifetime       *arvados.Duration
44         TrashCheckInterval  *arvados.Duration
45         PullWorkers         *int
46         TrashWorkers        *int
47         EmptyTrashWorkers   *int
48         TLSCertificateFile  *string
49         TLSKeyFile          *string
50
51         Volumes *oldKeepstoreVolumeList
52
53         ManagementToken *string
54
55         DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
56 }
57
58 type oldKeepstoreVolumeList []oldKeepstoreVolume
59
60 type oldKeepstoreVolume struct {
61         arvados.Volume
62         Type string `json:",omitempty"`
63
64         // Azure driver configs
65         StorageAccountName    string           `json:",omitempty"`
66         StorageAccountKeyFile string           `json:",omitempty"`
67         StorageBaseURL        string           `json:",omitempty"`
68         ContainerName         string           `json:",omitempty"`
69         AzureReplication      int              `json:",omitempty"`
70         RequestTimeout        arvados.Duration `json:",omitempty"`
71         ListBlobsRetryDelay   arvados.Duration `json:",omitempty"`
72         ListBlobsMaxAttempts  int              `json:",omitempty"`
73
74         // S3 driver configs
75         AccessKeyFile      string           `json:",omitempty"`
76         SecretKeyFile      string           `json:",omitempty"`
77         Endpoint           string           `json:",omitempty"`
78         Region             string           `json:",omitempty"`
79         Bucket             string           `json:",omitempty"`
80         LocationConstraint bool             `json:",omitempty"`
81         IndexPageSize      int              `json:",omitempty"`
82         S3Replication      int              `json:",omitempty"`
83         ConnectTimeout     arvados.Duration `json:",omitempty"`
84         ReadTimeout        arvados.Duration `json:",omitempty"`
85         RaceWindow         arvados.Duration `json:",omitempty"`
86         UnsafeDelete       bool             `json:",omitempty"`
87
88         // Directory driver configs
89         Root                 string
90         DirectoryReplication int
91         Serialize            bool
92
93         // Common configs
94         ReadOnly       bool     `json:",omitempty"`
95         StorageClasses []string `json:",omitempty"`
96 }
97
98 // update config using values from an old-style keepstore config file.
99 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
100         if ldr.KeepstorePath == "" {
101                 return nil
102         }
103         hostname, err := os.Hostname()
104         if err != nil {
105                 return fmt.Errorf("getting hostname: %s", err)
106         }
107
108         var oc oldKeepstoreConfig
109         err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
110         if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath {
111                 return nil
112         } else if err != nil {
113                 return err
114         }
115
116         cluster, err := cfg.GetCluster("")
117         if err != nil {
118                 return err
119         }
120
121         myURL := arvados.URL{Scheme: "http", Path: "/"}
122         if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
123                 myURL.Scheme = "https"
124         }
125
126         if v := oc.Debug; v == nil {
127         } else if *v && cluster.SystemLogs.LogLevel != "debug" {
128                 cluster.SystemLogs.LogLevel = "debug"
129         } else if !*v && cluster.SystemLogs.LogLevel != "info" {
130                 cluster.SystemLogs.LogLevel = "info"
131         }
132
133         if v := oc.TLSCertificateFile; v != nil {
134                 cluster.TLS.Certificate = "file://" + *v
135         }
136         if v := oc.TLSKeyFile; v != nil {
137                 cluster.TLS.Key = "file://" + *v
138         }
139         if v := oc.Listen; v != nil {
140                 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v, Path: "/"}]; ok {
141                         // already listed
142                         myURL.Host = *v
143                 } else if len(*v) > 1 && (*v)[0] == ':' {
144                         myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
145                         cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
146                 } else {
147                         return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v)
148                 }
149         } else {
150                 for url := range cluster.Services.Keepstore.InternalURLs {
151                         if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
152                                 myURL = url
153                                 break
154                         }
155                 }
156                 if myURL.Host == "" {
157                         return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
158                 }
159         }
160
161         if v := oc.LogFormat; v != nil {
162                 cluster.SystemLogs.Format = *v
163         }
164         if v := oc.MaxBuffers; v != nil {
165                 cluster.API.MaxKeepBlobBuffers = *v
166         }
167         if v := oc.MaxRequests; v != nil {
168                 cluster.API.MaxConcurrentRequests = *v
169         }
170         if v := oc.BlobSignatureTTL; v != nil {
171                 cluster.Collections.BlobSigningTTL = *v
172         }
173         if v := oc.BlobSigningKeyFile; v != nil {
174                 buf, err := ioutil.ReadFile(*v)
175                 if err != nil {
176                         return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
177                 }
178                 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
179                         cluster.Collections.BlobSigningKey = key
180                 }
181         }
182         if v := oc.RequireSignatures; v != nil {
183                 cluster.Collections.BlobSigning = *v
184         }
185         if v := oc.SystemAuthTokenFile; v != nil {
186                 f, err := os.Open(*v)
187                 if err != nil {
188                         return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
189                 }
190                 defer f.Close()
191                 buf, err := ioutil.ReadAll(f)
192                 if err != nil {
193                         return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
194                 }
195                 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
196                         cluster.SystemRootToken = key
197                 }
198         }
199         if v := oc.EnableDelete; v != nil {
200                 cluster.Collections.BlobTrash = *v
201         }
202         if v := oc.TrashLifetime; v != nil {
203                 cluster.Collections.BlobTrashLifetime = *v
204         }
205         if v := oc.TrashCheckInterval; v != nil {
206                 cluster.Collections.BlobTrashCheckInterval = *v
207         }
208         if v := oc.TrashWorkers; v != nil {
209                 cluster.Collections.BlobTrashConcurrency = *v
210         }
211         if v := oc.EmptyTrashWorkers; v != nil {
212                 cluster.Collections.BlobDeleteConcurrency = *v
213         }
214         if v := oc.PullWorkers; v != nil {
215                 cluster.Collections.BlobReplicateConcurrency = *v
216         }
217         if oc.Volumes == nil || len(*oc.Volumes) == 0 {
218                 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
219                 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
220                 if err != nil {
221                         return fmt.Errorf("error discovering local directory volumes: %s", err)
222                 }
223         } else {
224                 err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
225                 if err != nil {
226                         return err
227                 }
228         }
229
230         if err := ldr.checkPendingKeepstoreMigrations(cluster); err != nil {
231                 return err
232         }
233
234         cfg.Clusters[cluster.ClusterID] = *cluster
235         return nil
236 }
237
238 // Merge Volumes section of old keepstore config into cluster config.
239 func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
240         for i, oldvol := range *oc.Volumes {
241                 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
242                 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
243                 if found {
244                         accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
245                         writers := false
246                         for _, va := range accessViaHosts {
247                                 if !va.ReadOnly {
248                                         writers = true
249                                 }
250                         }
251                         if writers || len(accessViaHosts) == 0 {
252                                 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
253                                 if len(accessViaHosts) > 0 {
254                                         cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
255                                 }
256                                 continue
257                         }
258                 }
259                 var newvol arvados.Volume
260                 if found {
261                         ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
262                         newvol = cluster.Volumes[oldUUID]
263                         // Remove the old entry. It will be added back
264                         // below, possibly with a new UUID.
265                         delete(cluster.Volumes, oldUUID)
266                 } else {
267                         v, err := ldr.translateOldKeepstoreVolume(oldvol)
268                         if err != nil {
269                                 return err
270                         }
271                         newvol = v
272                 }
273                 if accessViaHosts == nil {
274                         accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
275                 }
276                 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
277                 newvol.AccessViaHosts = accessViaHosts
278
279                 volUUID := oldUUID
280                 if oldvol.ReadOnly {
281                 } else if oc.Listen == nil {
282                         ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
283                 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
284                         ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
285                 } else if len(uuid) != 27 {
286                         ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
287                 } else {
288                         rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
289                         if _, ok := cluster.Volumes[rendezvousUUID]; ok {
290                                 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
291                         } else {
292                                 volUUID = rendezvousUUID
293                         }
294                         si := cluster.Services.Keepstore.InternalURLs[myURL]
295                         si.Rendezvous = uuid[12:]
296                         cluster.Services.Keepstore.InternalURLs[myURL] = si
297                 }
298                 if volUUID == "" {
299                         volUUID = newUUID(cluster.ClusterID, "nyw5e")
300                         ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
301                 }
302                 cluster.Volumes[volUUID] = newvol
303         }
304         return nil
305 }
306
307 func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
308         var newvol arvados.Volume
309         var params interface{}
310         switch oldvol.Type {
311         case "S3":
312                 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
313                 if err != nil && oldvol.AccessKeyFile != "" {
314                         return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
315                 }
316                 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
317                 if err != nil && oldvol.SecretKeyFile != "" {
318                         return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
319                 }
320                 newvol = arvados.Volume{
321                         Driver:         "S3",
322                         ReadOnly:       oldvol.ReadOnly,
323                         Replication:    oldvol.S3Replication,
324                         StorageClasses: array2boolmap(oldvol.StorageClasses),
325                 }
326                 params = arvados.S3VolumeDriverParameters{
327                         AccessKeyID:        string(bytes.TrimSpace(accesskeydata)),
328                         SecretAccessKey:    string(bytes.TrimSpace(secretkeydata)),
329                         Endpoint:           oldvol.Endpoint,
330                         Region:             oldvol.Region,
331                         Bucket:             oldvol.Bucket,
332                         LocationConstraint: oldvol.LocationConstraint,
333                         IndexPageSize:      oldvol.IndexPageSize,
334                         ConnectTimeout:     oldvol.ConnectTimeout,
335                         ReadTimeout:        oldvol.ReadTimeout,
336                         RaceWindow:         oldvol.RaceWindow,
337                         UnsafeDelete:       oldvol.UnsafeDelete,
338                 }
339         case "Azure":
340                 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
341                 if err != nil && oldvol.StorageAccountKeyFile != "" {
342                         return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
343                 }
344                 newvol = arvados.Volume{
345                         Driver:         "Azure",
346                         ReadOnly:       oldvol.ReadOnly,
347                         Replication:    oldvol.AzureReplication,
348                         StorageClasses: array2boolmap(oldvol.StorageClasses),
349                 }
350                 params = arvados.AzureVolumeDriverParameters{
351                         StorageAccountName:   oldvol.StorageAccountName,
352                         StorageAccountKey:    string(bytes.TrimSpace(keydata)),
353                         StorageBaseURL:       oldvol.StorageBaseURL,
354                         ContainerName:        oldvol.ContainerName,
355                         RequestTimeout:       oldvol.RequestTimeout,
356                         ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
357                         ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
358                 }
359         case "Directory":
360                 newvol = arvados.Volume{
361                         Driver:         "Directory",
362                         ReadOnly:       oldvol.ReadOnly,
363                         Replication:    oldvol.DirectoryReplication,
364                         StorageClasses: array2boolmap(oldvol.StorageClasses),
365                 }
366                 params = arvados.DirectoryVolumeDriverParameters{
367                         Root:      oldvol.Root,
368                         Serialize: oldvol.Serialize,
369                 }
370         default:
371                 return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
372         }
373         dp, err := json.Marshal(params)
374         if err != nil {
375                 return newvol, err
376         }
377         newvol.DriverParameters = json.RawMessage(dp)
378         if newvol.Replication < 1 {
379                 newvol.Replication = 1
380         }
381         return newvol, nil
382 }
383
384 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
385         for uuid, newvol := range newvols {
386                 if oldvol.Type != newvol.Driver {
387                         continue
388                 }
389                 switch oldvol.Type {
390                 case "S3":
391                         var params arvados.S3VolumeDriverParameters
392                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
393                                 oldvol.Endpoint == params.Endpoint &&
394                                 oldvol.Region == params.Region &&
395                                 oldvol.Bucket == params.Bucket &&
396                                 oldvol.LocationConstraint == params.LocationConstraint {
397                                 return uuid, true
398                         }
399                 case "Azure":
400                         var params arvados.AzureVolumeDriverParameters
401                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
402                                 oldvol.StorageAccountName == params.StorageAccountName &&
403                                 oldvol.StorageBaseURL == params.StorageBaseURL &&
404                                 oldvol.ContainerName == params.ContainerName {
405                                 return uuid, true
406                         }
407                 case "Directory":
408                         var params arvados.DirectoryVolumeDriverParameters
409                         if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
410                                 oldvol.Root == params.Root {
411                                 if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 {
412                                         return uuid, true
413                                 }
414                         }
415                 }
416         }
417         return "", false
418 }
419
420 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
421         if mountsFile == "" {
422                 mountsFile = "/proc/mounts"
423         }
424         f, err := os.Open(mountsFile)
425         if err != nil {
426                 return fmt.Errorf("error opening %s: %s", mountsFile, err)
427         }
428         defer f.Close()
429         scanner := bufio.NewScanner(f)
430         for scanner.Scan() {
431                 args := strings.Fields(scanner.Text())
432                 dev, mount := args[0], args[1]
433                 if mount == "/" {
434                         continue
435                 }
436                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
437                         continue
438                 }
439                 keepdir := mount + "/keep"
440                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
441                         continue
442                 }
443
444                 ro := false
445                 for _, fsopt := range strings.Split(args[3], ",") {
446                         if fsopt == "ro" {
447                                 ro = true
448                         }
449                 }
450
451                 uuid := newUUID(cluster.ClusterID, "nyw5e")
452                 ldr.Logger.WithFields(logrus.Fields{
453                         "UUID":                       uuid,
454                         "Driver":                     "Directory",
455                         "DriverParameters.Root":      keepdir,
456                         "DriverParameters.Serialize": false,
457                         "ReadOnly":                   ro,
458                         "Replication":                1,
459                 }).Warn("adding local directory volume")
460
461                 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
462                         Root:      keepdir,
463                         Serialize: false,
464                 })
465                 if err != nil {
466                         panic(err)
467                 }
468                 cluster.Volumes[uuid] = arvados.Volume{
469                         Driver:           "Directory",
470                         DriverParameters: p,
471                         ReadOnly:         ro,
472                         Replication:      1,
473                         AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
474                                 myURL: {ReadOnly: ro},
475                         },
476                 }
477         }
478         if err := scanner.Err(); err != nil {
479                 return fmt.Errorf("reading %s: %s", mountsFile, err)
480         }
481         return nil
482 }
483
484 func array2boolmap(keys []string) map[string]bool {
485         m := map[string]bool{}
486         for _, k := range keys {
487                 m[k] = true
488         }
489         return m
490 }
491
492 func newUUID(clusterID, infix string) string {
493         randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
494         if err != nil {
495                 panic(err)
496         }
497         randstr := randint.Text(36)
498         for len(randstr) < 15 {
499                 randstr = "0" + randstr
500         }
501         return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
502 }
503
504 // Return the UUID and URL for the controller's keep_services listing
505 // corresponding to this host/process.
506 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
507         client, err := arvados.NewClientFromConfig(cluster)
508         if err != nil {
509                 return
510         }
511         client.AuthToken = cluster.SystemRootToken
512         var svcList arvados.KeepServiceList
513         err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
514         if err != nil {
515                 return
516         }
517         hostname, err := os.Hostname()
518         if err != nil {
519                 err = fmt.Errorf("error getting hostname: %s", err)
520                 return
521         }
522         var tried []string
523         for _, ks := range svcList.Items {
524                 if ks.ServiceType == "proxy" {
525                         continue
526                 } else if keepServiceIsMe(ks, hostname, listen) {
527                         return ks.UUID, keepServiceURL(ks), nil
528                 } else {
529                         tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
530                 }
531         }
532         err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
533         return
534 }
535
536 func keepServiceURL(ks arvados.KeepService) arvados.URL {
537         url := arvados.URL{
538                 Scheme: "http",
539                 Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
540                 Path:   "/",
541         }
542         if ks.ServiceSSLFlag {
543                 url.Scheme = "https"
544         }
545         return url
546 }
547
548 var localhostOrAllInterfaces = map[string]bool{
549         "localhost": true,
550         "127.0.0.1": true,
551         "::1":       true,
552         "::":        true,
553         "":          true,
554 }
555
556 // Return true if the given KeepService entry matches the given
557 // hostname and (keepstore config file) listen address.
558 //
559 // If the KeepService host is some variant of "localhost", we assume
560 // this is a testing or single-node environment, ignore the given
561 // hostname, and return true if the port numbers match.
562 //
563 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
564 // match a KeepService host "foo.bar", but also "foo.bar.example",
565 // "foo.bar.example.org", etc.
566 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
567         // Extract the port name/number from listen, and resolve it to
568         // a port number to compare with ks.ServicePort.
569         _, listenport, err := net.SplitHostPort(listen)
570         if err != nil && strings.HasPrefix(listen, ":") {
571                 listenport = listen[1:]
572         }
573         if lp, err := net.LookupPort("tcp", listenport); err != nil {
574                 return false
575         } else if !(lp == ks.ServicePort ||
576                 (lp == 0 && ks.ServicePort == 80)) {
577                 return false
578         }
579
580         kshost := strings.ToLower(ks.ServiceHost)
581         return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
582 }
583
584 // Warn about pending keepstore migration tasks that haven't already
585 // been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated
586 // keepstore hosts other than the present host, and obsolete content
587 // in the keep_services table.
588 func (ldr *Loader) checkPendingKeepstoreMigrations(cluster *arvados.Cluster) error {
589         if cluster.Services.Controller.ExternalURL.String() == "" {
590                 ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations")
591                 return nil
592         }
593         if ldr.SkipAPICalls {
594                 ldr.Logger.Debug("(Loader).SkipAPICalls == true -- skipping check for pending keepstore config migrations")
595                 return nil
596         }
597         client, err := arvados.NewClientFromConfig(cluster)
598         if err != nil {
599                 return err
600         }
601         client.AuthToken = cluster.SystemRootToken
602         var svcList arvados.KeepServiceList
603         err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
604         if err != nil {
605                 ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations")
606                 return nil
607         }
608         hostname, err := os.Hostname()
609         if err != nil {
610                 return fmt.Errorf("error getting hostname: %s", err)
611         }
612         sawTimes := map[time.Time]bool{}
613         for _, ks := range svcList.Items {
614                 sawTimes[ks.CreatedAt] = true
615                 sawTimes[ks.ModifiedAt] = true
616         }
617         if len(sawTimes) <= 1 {
618                 // If all timestamps in the arvados/v1/keep_services
619                 // response are identical, it's a clear sign the
620                 // response was generated on the fly from the cluster
621                 // config, rather than real database records. In that
622                 // case (as well as the case where none are listed at
623                 // all) it's pointless to look for entries that
624                 // haven't yet been migrated to the config file.
625                 return nil
626         }
627         needDBRows := false
628         for _, ks := range svcList.Items {
629                 if ks.ServiceType == "proxy" {
630                         if len(cluster.Services.Keepproxy.InternalURLs) == 0 {
631                                 needDBRows = true
632                                 ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file")
633                         }
634                         continue
635                 }
636                 kshost := strings.ToLower(ks.ServiceHost)
637                 if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") {
638                         // it would be confusing to recommend
639                         // migrating *this* host's legacy keepstore
640                         // config immediately after explaining that
641                         // very migration process in more detail.
642                         continue
643                 }
644                 ksurl := keepServiceURL(ks)
645                 if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok {
646                         // already added to InternalURLs
647                         continue
648                 }
649                 ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost)
650         }
651         if !needDBRows {
652                 ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead")
653         }
654         return nil
655 }
656
657 // Warn about keepstore servers that have no volumes.
658 func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error {
659 servers:
660         for url := range cluster.Services.Keepstore.InternalURLs {
661                 for _, vol := range cluster.Volumes {
662                         if len(vol.AccessViaHosts) == 0 {
663                                 // accessible by all servers
664                                 return nil
665                         }
666                         if _, ok := vol.AccessViaHosts[url]; ok {
667                                 continue servers
668                         }
669                 }
670                 ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url)
671         }
672         return nil
673 }
674
675 // Warn about AccessViaHosts entries that don't correspond to any of
676 // the listed keepstore services.
677 func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error {
678         for uuid, vol := range cluster.Volumes {
679                 if uuid == "SAMPLE" {
680                         continue
681                 }
682                 for url := range vol.AccessViaHosts {
683                         if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok {
684                                 ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url)
685                         }
686                 }
687         }
688         return nil
689 }