15924: Change import paths to git.arvados.org.
[arvados.git] / lib / config / deprecated_keepstore.go
index 0d1a3c4e69a67a63114f8d5b6f7f3ee24ca67510..401764c87add6e1e37dff63052302df34a3f5e54 100644 (file)
@@ -16,8 +16,9 @@ import (
        "os"
        "strconv"
        "strings"
+       "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
@@ -106,7 +107,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
 
        var oc oldKeepstoreConfig
        err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
-       if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
+       if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath {
                return nil
        } else if err != nil {
                return err
@@ -129,10 +130,10 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
                cluster.SystemLogs.LogLevel = "info"
        }
 
-       if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
+       if v := oc.TLSCertificateFile; v != nil {
                cluster.TLS.Certificate = "file://" + *v
        }
-       if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
+       if v := oc.TLSKeyFile; v != nil {
                cluster.TLS.Key = "file://" + *v
        }
        if v := oc.Listen; v != nil {
@@ -143,7 +144,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
                        myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
                        cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
                } else {
-                       return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
+                       return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v)
                }
        } else {
                for url := range cluster.Services.Keepstore.InternalURLs {
@@ -157,16 +158,16 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
                }
        }
 
-       if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
+       if v := oc.LogFormat; v != nil {
                cluster.SystemLogs.Format = *v
        }
-       if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
-               cluster.API.MaxKeepBlockBuffers = *v
+       if v := oc.MaxBuffers; v != nil {
+               cluster.API.MaxKeepBlobBuffers = *v
        }
-       if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
+       if v := oc.MaxRequests; v != nil {
                cluster.API.MaxConcurrentRequests = *v
        }
-       if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
+       if v := oc.BlobSignatureTTL; v != nil {
                cluster.Collections.BlobSigningTTL = *v
        }
        if v := oc.BlobSigningKeyFile; v != nil {
@@ -178,7 +179,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
                        cluster.Collections.BlobSigningKey = key
                }
        }
-       if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
+       if v := oc.RequireSignatures; v != nil {
                cluster.Collections.BlobSigning = *v
        }
        if v := oc.SystemAuthTokenFile; v != nil {
@@ -195,169 +196,191 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
                        cluster.SystemRootToken = key
                }
        }
-       if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
+       if v := oc.EnableDelete; v != nil {
                cluster.Collections.BlobTrash = *v
        }
-       if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
+       if v := oc.TrashLifetime; v != nil {
                cluster.Collections.BlobTrashLifetime = *v
        }
-       if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
+       if v := oc.TrashCheckInterval; v != nil {
                cluster.Collections.BlobTrashCheckInterval = *v
        }
-       if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+       if v := oc.TrashWorkers; v != nil {
                cluster.Collections.BlobTrashConcurrency = *v
        }
-       if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+       if v := oc.EmptyTrashWorkers; v != nil {
                cluster.Collections.BlobDeleteConcurrency = *v
        }
-       if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+       if v := oc.PullWorkers; v != nil {
                cluster.Collections.BlobReplicateConcurrency = *v
        }
-       if v := oc.Volumes; v == nil {
+       if oc.Volumes == nil || len(*oc.Volumes) == 0 {
                ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
                err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
                if err != nil {
                        return fmt.Errorf("error discovering local directory volumes: %s", err)
                }
        } else {
-               for i, oldvol := range *v {
-                       var accessViaHosts map[arvados.URL]arvados.VolumeAccess
-                       oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
-                       if found {
-                               accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
-                               writers := false
-                               for _, va := range accessViaHosts {
-                                       if !va.ReadOnly {
-                                               writers = true
-                                       }
-                               }
-                               if writers || len(accessViaHosts) == 0 {
-                                       ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
-                                       if len(accessViaHosts) > 0 {
-                                               cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
-                                       }
-                                       continue
+               err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
+               if err != nil {
+                       return err
+               }
+       }
+
+       if err := ldr.checkPendingKeepstoreMigrations(cluster); err != nil {
+               return err
+       }
+
+       cfg.Clusters[cluster.ClusterID] = *cluster
+       return nil
+}
+
+// Merge Volumes section of old keepstore config into cluster config.
+func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
+       for i, oldvol := range *oc.Volumes {
+               var accessViaHosts map[arvados.URL]arvados.VolumeAccess
+               oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
+               if found {
+                       accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
+                       writers := false
+                       for _, va := range accessViaHosts {
+                               if !va.ReadOnly {
+                                       writers = true
                                }
                        }
-                       var newvol arvados.Volume
-                       if found {
+                       if writers || len(accessViaHosts) == 0 {
                                ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
-                               newvol = cluster.Volumes[oldUUID]
-                               // Remove the old entry. It will be
-                               // added back below, possibly with a
-                               // new UUID.
-                               delete(cluster.Volumes, oldUUID)
-                       } else {
-                               var params interface{}
-                               switch oldvol.Type {
-                               case "S3":
-                                       accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
-                                       if err != nil && oldvol.AccessKeyFile != "" {
-                                               return fmt.Errorf("error reading AccessKeyFile: %s", err)
-                                       }
-                                       secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
-                                       if err != nil && oldvol.SecretKeyFile != "" {
-                                               return fmt.Errorf("error reading SecretKeyFile: %s", err)
-                                       }
-                                       newvol = arvados.Volume{
-                                               Driver:         "S3",
-                                               ReadOnly:       oldvol.ReadOnly,
-                                               Replication:    oldvol.S3Replication,
-                                               StorageClasses: array2boolmap(oldvol.StorageClasses),
-                                       }
-                                       params = arvados.S3VolumeDriverParameters{
-                                               AccessKey:          string(bytes.TrimSpace(accesskeydata)),
-                                               SecretKey:          string(bytes.TrimSpace(secretkeydata)),
-                                               Endpoint:           oldvol.Endpoint,
-                                               Region:             oldvol.Region,
-                                               Bucket:             oldvol.Bucket,
-                                               LocationConstraint: oldvol.LocationConstraint,
-                                               IndexPageSize:      oldvol.IndexPageSize,
-                                               ConnectTimeout:     oldvol.ConnectTimeout,
-                                               ReadTimeout:        oldvol.ReadTimeout,
-                                               RaceWindow:         oldvol.RaceWindow,
-                                               UnsafeDelete:       oldvol.UnsafeDelete,
-                                       }
-                               case "Azure":
-                                       keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
-                                       if err != nil && oldvol.StorageAccountKeyFile != "" {
-                                               return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
-                                       }
-                                       newvol = arvados.Volume{
-                                               Driver:         "Azure",
-                                               ReadOnly:       oldvol.ReadOnly,
-                                               Replication:    oldvol.AzureReplication,
-                                               StorageClasses: array2boolmap(oldvol.StorageClasses),
-                                       }
-                                       params = arvados.AzureVolumeDriverParameters{
-                                               StorageAccountName:   oldvol.StorageAccountName,
-                                               StorageAccountKey:    string(bytes.TrimSpace(keydata)),
-                                               StorageBaseURL:       oldvol.StorageBaseURL,
-                                               ContainerName:        oldvol.ContainerName,
-                                               RequestTimeout:       oldvol.RequestTimeout,
-                                               ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
-                                               ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
-                                       }
-                               case "Directory":
-                                       newvol = arvados.Volume{
-                                               Driver:         "Directory",
-                                               ReadOnly:       oldvol.ReadOnly,
-                                               Replication:    oldvol.DirectoryReplication,
-                                               StorageClasses: array2boolmap(oldvol.StorageClasses),
-                                       }
-                                       params = arvados.DirectoryVolumeDriverParameters{
-                                               Root:      oldvol.Root,
-                                               Serialize: oldvol.Serialize,
-                                       }
-                               default:
-                                       return fmt.Errorf("unsupported volume type %q", oldvol.Type)
-                               }
-                               dp, err := json.Marshal(params)
-                               if err != nil {
-                                       return err
-                               }
-                               newvol.DriverParameters = json.RawMessage(dp)
-                               if newvol.Replication < 1 {
-                                       newvol.Replication = 1
+                               if len(accessViaHosts) > 0 {
+                                       cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
                                }
+                               continue
                        }
-                       if accessViaHosts == nil {
-                               accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
+               }
+               var newvol arvados.Volume
+               if found {
+                       ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
+                       newvol = cluster.Volumes[oldUUID]
+                       // Remove the old entry. It will be added back
+                       // below, possibly with a new UUID.
+                       delete(cluster.Volumes, oldUUID)
+               } else {
+                       v, err := ldr.translateOldKeepstoreVolume(oldvol)
+                       if err != nil {
+                               return err
                        }
-                       accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
-                       newvol.AccessViaHosts = accessViaHosts
-
-                       volUUID := oldUUID
-                       if oldvol.ReadOnly {
-                       } else if oc.Listen == nil {
-                               ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
-                       } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
-                               ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
-                       } else if len(uuid) != 27 {
-                               ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
+                       newvol = v
+               }
+               if accessViaHosts == nil {
+                       accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
+               }
+               accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
+               newvol.AccessViaHosts = accessViaHosts
+
+               volUUID := oldUUID
+               if oldvol.ReadOnly {
+               } else if oc.Listen == nil {
+                       ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
+               } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
+                       ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
+               } else if len(uuid) != 27 {
+                       ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
+               } else {
+                       rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
+                       if _, ok := cluster.Volumes[rendezvousUUID]; ok {
+                               ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
                        } else {
-                               rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
-                               if _, ok := cluster.Volumes[rendezvousUUID]; ok {
-                                       ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
-                               } else {
-                                       volUUID = rendezvousUUID
-                               }
-                               si := cluster.Services.Keepstore.InternalURLs[myURL]
-                               si.Rendezvous = uuid[12:]
-                               cluster.Services.Keepstore.InternalURLs[myURL] = si
+                               volUUID = rendezvousUUID
                        }
-                       if volUUID == "" {
-                               volUUID = newUUID(cluster.ClusterID, "nyw5e")
-                               ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
-                       }
-                       cluster.Volumes[volUUID] = newvol
+                       si := cluster.Services.Keepstore.InternalURLs[myURL]
+                       si.Rendezvous = uuid[12:]
+                       cluster.Services.Keepstore.InternalURLs[myURL] = si
+               }
+               if volUUID == "" {
+                       volUUID = newUUID(cluster.ClusterID, "nyw5e")
+                       ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
                }
+               cluster.Volumes[volUUID] = newvol
        }
-
-       cfg.Clusters[cluster.ClusterID] = *cluster
        return nil
 }
 
+func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
+       var newvol arvados.Volume
+       var params interface{}
+       switch oldvol.Type {
+       case "S3":
+               accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
+               if err != nil && oldvol.AccessKeyFile != "" {
+                       return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
+               }
+               secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
+               if err != nil && oldvol.SecretKeyFile != "" {
+                       return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
+               }
+               newvol = arvados.Volume{
+                       Driver:         "S3",
+                       ReadOnly:       oldvol.ReadOnly,
+                       Replication:    oldvol.S3Replication,
+                       StorageClasses: array2boolmap(oldvol.StorageClasses),
+               }
+               params = arvados.S3VolumeDriverParameters{
+                       AccessKey:          string(bytes.TrimSpace(accesskeydata)),
+                       SecretKey:          string(bytes.TrimSpace(secretkeydata)),
+                       Endpoint:           oldvol.Endpoint,
+                       Region:             oldvol.Region,
+                       Bucket:             oldvol.Bucket,
+                       LocationConstraint: oldvol.LocationConstraint,
+                       IndexPageSize:      oldvol.IndexPageSize,
+                       ConnectTimeout:     oldvol.ConnectTimeout,
+                       ReadTimeout:        oldvol.ReadTimeout,
+                       RaceWindow:         oldvol.RaceWindow,
+                       UnsafeDelete:       oldvol.UnsafeDelete,
+               }
+       case "Azure":
+               keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
+               if err != nil && oldvol.StorageAccountKeyFile != "" {
+                       return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
+               }
+               newvol = arvados.Volume{
+                       Driver:         "Azure",
+                       ReadOnly:       oldvol.ReadOnly,
+                       Replication:    oldvol.AzureReplication,
+                       StorageClasses: array2boolmap(oldvol.StorageClasses),
+               }
+               params = arvados.AzureVolumeDriverParameters{
+                       StorageAccountName:   oldvol.StorageAccountName,
+                       StorageAccountKey:    string(bytes.TrimSpace(keydata)),
+                       StorageBaseURL:       oldvol.StorageBaseURL,
+                       ContainerName:        oldvol.ContainerName,
+                       RequestTimeout:       oldvol.RequestTimeout,
+                       ListBlobsRetryDelay:  oldvol.ListBlobsRetryDelay,
+                       ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
+               }
+       case "Directory":
+               newvol = arvados.Volume{
+                       Driver:         "Directory",
+                       ReadOnly:       oldvol.ReadOnly,
+                       Replication:    oldvol.DirectoryReplication,
+                       StorageClasses: array2boolmap(oldvol.StorageClasses),
+               }
+               params = arvados.DirectoryVolumeDriverParameters{
+                       Root:      oldvol.Root,
+                       Serialize: oldvol.Serialize,
+               }
+       default:
+               return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
+       }
+       dp, err := json.Marshal(params)
+       if err != nil {
+               return newvol, err
+       }
+       newvol.DriverParameters = json.RawMessage(dp)
+       if newvol.Replication < 1 {
+               newvol.Replication = 1
+       }
+       return newvol, nil
+}
+
 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
        for uuid, newvol := range newvols {
                if oldvol.Type != newvol.Driver {
@@ -385,7 +408,7 @@ func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string
                        var params arvados.DirectoryVolumeDriverParameters
                        if err := json.Unmarshal(newvol.DriverParameters, &params); err == nil &&
                                oldvol.Root == params.Root {
-                               if _, ok := newvol.AccessViaHosts[myURL]; ok {
+                               if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 {
                                        return uuid, true
                                }
                        }
@@ -501,14 +524,7 @@ func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string,
                if ks.ServiceType == "proxy" {
                        continue
                } else if keepServiceIsMe(ks, hostname, listen) {
-                       url := arvados.URL{
-                               Scheme: "http",
-                               Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
-                       }
-                       if ks.ServiceSSLFlag {
-                               url.Scheme = "https"
-                       }
-                       return ks.UUID, url, nil
+                       return ks.UUID, keepServiceURL(ks), nil
                } else {
                        tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
                }
@@ -517,6 +533,17 @@ func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string,
        return
 }
 
+func keepServiceURL(ks arvados.KeepService) arvados.URL {
+       url := arvados.URL{
+               Scheme: "http",
+               Host:   net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
+       }
+       if ks.ServiceSSLFlag {
+               url.Scheme = "https"
+       }
+       return url
+}
+
 var localhostOrAllInterfaces = map[string]bool{
        "localhost": true,
        "127.0.0.1": true,
@@ -552,3 +579,110 @@ func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) boo
        kshost := strings.ToLower(ks.ServiceHost)
        return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
 }
+
+// Warn about pending keepstore migration tasks that haven't already
+// been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated
+// keepstore hosts other than the present host, and obsolete content
+// in the keep_services table.
+func (ldr *Loader) checkPendingKeepstoreMigrations(cluster *arvados.Cluster) error {
+       if cluster.Services.Controller.ExternalURL.String() == "" {
+               ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations")
+               return nil
+       }
+       if ldr.SkipAPICalls {
+               ldr.Logger.Debug("(Loader).SkipAPICalls == true -- skipping check for pending keepstore config migrations")
+               return nil
+       }
+       client, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return err
+       }
+       client.AuthToken = cluster.SystemRootToken
+       var svcList arvados.KeepServiceList
+       err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
+       if err != nil {
+               ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations")
+               return nil
+       }
+       hostname, err := os.Hostname()
+       if err != nil {
+               return fmt.Errorf("error getting hostname: %s", err)
+       }
+       sawTimes := map[time.Time]bool{}
+       for _, ks := range svcList.Items {
+               sawTimes[ks.CreatedAt] = true
+               sawTimes[ks.ModifiedAt] = true
+       }
+       if len(sawTimes) <= 1 {
+               // If all timestamps in the arvados/v1/keep_services
+               // response are identical, it's a clear sign the
+               // response was generated on the fly from the cluster
+               // config, rather than real database records. In that
+               // case (as well as the case where none are listed at
+               // all) it's pointless to look for entries that
+               // haven't yet been migrated to the config file.
+               return nil
+       }
+       needDBRows := false
+       for _, ks := range svcList.Items {
+               if ks.ServiceType == "proxy" {
+                       if len(cluster.Services.Keepproxy.InternalURLs) == 0 {
+                               needDBRows = true
+                               ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file")
+                       }
+                       continue
+               }
+               kshost := strings.ToLower(ks.ServiceHost)
+               if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") {
+                       // it would be confusing to recommend
+                       // migrating *this* host's legacy keepstore
+                       // config immediately after explaining that
+                       // very migration process in more detail.
+                       continue
+               }
+               ksurl := keepServiceURL(ks)
+               if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok {
+                       // already added to InternalURLs
+                       continue
+               }
+               ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost)
+       }
+       if !needDBRows {
+               ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead")
+       }
+       return nil
+}
+
+// Warn about keepstore servers that have no volumes.
+func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error {
+servers:
+       for url := range cluster.Services.Keepstore.InternalURLs {
+               for _, vol := range cluster.Volumes {
+                       if len(vol.AccessViaHosts) == 0 {
+                               // accessible by all servers
+                               return nil
+                       }
+                       if _, ok := vol.AccessViaHosts[url]; ok {
+                               continue servers
+                       }
+               }
+               ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url)
+       }
+       return nil
+}
+
+// Warn about AccessViaHosts entries that don't correspond to any of
+// the listed keepstore services.
+func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error {
+       for uuid, vol := range cluster.Volumes {
+               if uuid == "SAMPLE" {
+                       continue
+               }
+               for url := range vol.AccessViaHosts {
+                       if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok {
+                               ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url)
+                       }
+               }
+       }
+       return nil
+}