X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/90d84d8578b760b493ac76b22c42bc284868bc0c..490aaa38ccb260f94e2ee25e7c6a50c982da35ae:/lib/config/deprecated_keepstore.go diff --git a/lib/config/deprecated_keepstore.go b/lib/config/deprecated_keepstore.go index 0d1a3c4e69..d9f4815fcf 100644 --- a/lib/config/deprecated_keepstore.go +++ b/lib/config/deprecated_keepstore.go @@ -16,8 +16,9 @@ import ( "os" "strconv" "strings" + "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" ) @@ -106,7 +107,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { var oc oldKeepstoreConfig err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc) - if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) { + if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath { return nil } else if err != nil { return err @@ -117,7 +118,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { return err } - myURL := arvados.URL{Scheme: "http"} + myURL := arvados.URL{Scheme: "http", Path: "/"} if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil { myURL.Scheme = "https" } @@ -129,21 +130,21 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { cluster.SystemLogs.LogLevel = "info" } - if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate { + if v := oc.TLSCertificateFile; v != nil { cluster.TLS.Certificate = "file://" + *v } - if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key { + if v := oc.TLSKeyFile; v != nil { cluster.TLS.Key = "file://" + *v } if v := oc.Listen; v != nil { - if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok { + if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v, Path: "/"}]; ok { // already listed myURL.Host = *v } else if len(*v) > 1 && (*v)[0] == ':' { myURL.Host = net.JoinHostPort(hostname, (*v)[1:]) cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{} } else { - return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v) + return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v) } } else { for url := range cluster.Services.Keepstore.InternalURLs { @@ -157,16 +158,16 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { } } - if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format { + if v := oc.LogFormat; v != nil { cluster.SystemLogs.Format = *v } - if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers { - cluster.API.MaxKeepBlockBuffers = *v + if v := oc.MaxBuffers; v != nil { + cluster.API.MaxKeepBlobBuffers = *v } - if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests { + if v := oc.MaxRequests; v != nil { cluster.API.MaxConcurrentRequests = *v } - if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL { + if v := oc.BlobSignatureTTL; v != nil { cluster.Collections.BlobSigningTTL = *v } if v := oc.BlobSigningKeyFile; v != nil { @@ -178,7 +179,7 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { cluster.Collections.BlobSigningKey = key } } - if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning { + if v := oc.RequireSignatures; v != nil { cluster.Collections.BlobSigning = *v } if v := oc.SystemAuthTokenFile; v != nil { @@ -195,169 +196,191 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { cluster.SystemRootToken = key } } - if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash { + if v := oc.EnableDelete; v != nil { cluster.Collections.BlobTrash = *v } - if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime { + if v := oc.TrashLifetime; v != nil { cluster.Collections.BlobTrashLifetime = *v } - if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval { + if v := oc.TrashCheckInterval; v != nil { cluster.Collections.BlobTrashCheckInterval = *v } - if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency { + if v := oc.TrashWorkers; v != nil { cluster.Collections.BlobTrashConcurrency = *v } - if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency { + if v := oc.EmptyTrashWorkers; v != nil { cluster.Collections.BlobDeleteConcurrency = *v } - if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency { + if v := oc.PullWorkers; v != nil { cluster.Collections.BlobReplicateConcurrency = *v } - if v := oc.Volumes; v == nil { + if oc.Volumes == nil || len(*oc.Volumes) == 0 { ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes") err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL) if err != nil { return fmt.Errorf("error discovering local directory volumes: %s", err) } } else { - for i, oldvol := range *v { - var accessViaHosts map[arvados.URL]arvados.VolumeAccess - oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL) - if found { - accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts - writers := false - for _, va := range accessViaHosts { - if !va.ReadOnly { - writers = true - } - } - if writers || len(accessViaHosts) == 0 { - ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i) - if len(accessViaHosts) > 0 { - cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} - } - continue + err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL) + if err != nil { + return err + } + } + + if err := ldr.checkPendingKeepstoreMigrations(cluster); err != nil { + return err + } + + cfg.Clusters[cluster.ClusterID] = *cluster + return nil +} + +// Merge Volumes section of old keepstore config into cluster config. +func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error { + for i, oldvol := range *oc.Volumes { + var accessViaHosts map[arvados.URL]arvados.VolumeAccess + oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL) + if found { + accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts + writers := false + for _, va := range accessViaHosts { + if !va.ReadOnly { + writers = true } } - var newvol arvados.Volume - if found { + if writers || len(accessViaHosts) == 0 { ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i) - newvol = cluster.Volumes[oldUUID] - // Remove the old entry. It will be - // added back below, possibly with a - // new UUID. - delete(cluster.Volumes, oldUUID) - } else { - var params interface{} - switch oldvol.Type { - case "S3": - accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile) - if err != nil && oldvol.AccessKeyFile != "" { - return fmt.Errorf("error reading AccessKeyFile: %s", err) - } - secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile) - if err != nil && oldvol.SecretKeyFile != "" { - return fmt.Errorf("error reading SecretKeyFile: %s", err) - } - newvol = arvados.Volume{ - Driver: "S3", - ReadOnly: oldvol.ReadOnly, - Replication: oldvol.S3Replication, - StorageClasses: array2boolmap(oldvol.StorageClasses), - } - params = arvados.S3VolumeDriverParameters{ - AccessKey: string(bytes.TrimSpace(accesskeydata)), - SecretKey: string(bytes.TrimSpace(secretkeydata)), - Endpoint: oldvol.Endpoint, - Region: oldvol.Region, - Bucket: oldvol.Bucket, - LocationConstraint: oldvol.LocationConstraint, - IndexPageSize: oldvol.IndexPageSize, - ConnectTimeout: oldvol.ConnectTimeout, - ReadTimeout: oldvol.ReadTimeout, - RaceWindow: oldvol.RaceWindow, - UnsafeDelete: oldvol.UnsafeDelete, - } - case "Azure": - keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile) - if err != nil && oldvol.StorageAccountKeyFile != "" { - return fmt.Errorf("error reading StorageAccountKeyFile: %s", err) - } - newvol = arvados.Volume{ - Driver: "Azure", - ReadOnly: oldvol.ReadOnly, - Replication: oldvol.AzureReplication, - StorageClasses: array2boolmap(oldvol.StorageClasses), - } - params = arvados.AzureVolumeDriverParameters{ - StorageAccountName: oldvol.StorageAccountName, - StorageAccountKey: string(bytes.TrimSpace(keydata)), - StorageBaseURL: oldvol.StorageBaseURL, - ContainerName: oldvol.ContainerName, - RequestTimeout: oldvol.RequestTimeout, - ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay, - ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts, - } - case "Directory": - newvol = arvados.Volume{ - Driver: "Directory", - ReadOnly: oldvol.ReadOnly, - Replication: oldvol.DirectoryReplication, - StorageClasses: array2boolmap(oldvol.StorageClasses), - } - params = arvados.DirectoryVolumeDriverParameters{ - Root: oldvol.Root, - Serialize: oldvol.Serialize, - } - default: - return fmt.Errorf("unsupported volume type %q", oldvol.Type) - } - dp, err := json.Marshal(params) - if err != nil { - return err - } - newvol.DriverParameters = json.RawMessage(dp) - if newvol.Replication < 1 { - newvol.Replication = 1 + if len(accessViaHosts) > 0 { + cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} } + continue } - if accessViaHosts == nil { - accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1) + } + var newvol arvados.Volume + if found { + ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i) + newvol = cluster.Volumes[oldUUID] + // Remove the old entry. It will be added back + // below, possibly with a new UUID. + delete(cluster.Volumes, oldUUID) + } else { + v, err := ldr.translateOldKeepstoreVolume(oldvol) + if err != nil { + return err } - accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} - newvol.AccessViaHosts = accessViaHosts - - volUUID := oldUUID - if oldvol.ReadOnly { - } else if oc.Listen == nil { - ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config") - } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil { - ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config") - } else if len(uuid) != 27 { - ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format") + newvol = v + } + if accessViaHosts == nil { + accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1) + } + accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} + newvol.AccessViaHosts = accessViaHosts + + volUUID := oldUUID + if oldvol.ReadOnly { + } else if oc.Listen == nil { + ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config") + } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil { + ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config") + } else if len(uuid) != 27 { + ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format") + } else { + rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:] + if _, ok := cluster.Volumes[rendezvousUUID]; ok { + ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use") } else { - rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:] - if _, ok := cluster.Volumes[rendezvousUUID]; ok { - ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use") - } else { - volUUID = rendezvousUUID - } - si := cluster.Services.Keepstore.InternalURLs[myURL] - si.Rendezvous = uuid[12:] - cluster.Services.Keepstore.InternalURLs[myURL] = si + volUUID = rendezvousUUID } - if volUUID == "" { - volUUID = newUUID(cluster.ClusterID, "nyw5e") - ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i) - } - cluster.Volumes[volUUID] = newvol + si := cluster.Services.Keepstore.InternalURLs[myURL] + si.Rendezvous = uuid[12:] + cluster.Services.Keepstore.InternalURLs[myURL] = si + } + if volUUID == "" { + volUUID = newUUID(cluster.ClusterID, "nyw5e") + ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i) } + cluster.Volumes[volUUID] = newvol } - - cfg.Clusters[cluster.ClusterID] = *cluster return nil } +func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) { + var newvol arvados.Volume + var params interface{} + switch oldvol.Type { + case "S3": + accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile) + if err != nil && oldvol.AccessKeyFile != "" { + return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err) + } + secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile) + if err != nil && oldvol.SecretKeyFile != "" { + return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err) + } + newvol = arvados.Volume{ + Driver: "S3", + ReadOnly: oldvol.ReadOnly, + Replication: oldvol.S3Replication, + StorageClasses: array2boolmap(oldvol.StorageClasses), + } + params = arvados.S3VolumeDriverParameters{ + AccessKeyID: string(bytes.TrimSpace(accesskeydata)), + SecretAccessKey: string(bytes.TrimSpace(secretkeydata)), + Endpoint: oldvol.Endpoint, + Region: oldvol.Region, + Bucket: oldvol.Bucket, + LocationConstraint: oldvol.LocationConstraint, + IndexPageSize: oldvol.IndexPageSize, + ConnectTimeout: oldvol.ConnectTimeout, + ReadTimeout: oldvol.ReadTimeout, + RaceWindow: oldvol.RaceWindow, + UnsafeDelete: oldvol.UnsafeDelete, + } + case "Azure": + keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile) + if err != nil && oldvol.StorageAccountKeyFile != "" { + return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err) + } + newvol = arvados.Volume{ + Driver: "Azure", + ReadOnly: oldvol.ReadOnly, + Replication: oldvol.AzureReplication, + StorageClasses: array2boolmap(oldvol.StorageClasses), + } + params = arvados.AzureVolumeDriverParameters{ + StorageAccountName: oldvol.StorageAccountName, + StorageAccountKey: string(bytes.TrimSpace(keydata)), + StorageBaseURL: oldvol.StorageBaseURL, + ContainerName: oldvol.ContainerName, + RequestTimeout: oldvol.RequestTimeout, + ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay, + ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts, + } + case "Directory": + newvol = arvados.Volume{ + Driver: "Directory", + ReadOnly: oldvol.ReadOnly, + Replication: oldvol.DirectoryReplication, + StorageClasses: array2boolmap(oldvol.StorageClasses), + } + params = arvados.DirectoryVolumeDriverParameters{ + Root: oldvol.Root, + Serialize: oldvol.Serialize, + } + default: + return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type) + } + dp, err := json.Marshal(params) + if err != nil { + return newvol, err + } + newvol.DriverParameters = json.RawMessage(dp) + if newvol.Replication < 1 { + newvol.Replication = 1 + } + return newvol, nil +} + func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) { for uuid, newvol := range newvols { if oldvol.Type != newvol.Driver { @@ -385,7 +408,7 @@ func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string var params arvados.DirectoryVolumeDriverParameters if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil && oldvol.Root == params.Root { - if _, ok := newvol.AccessViaHosts[myURL]; ok { + if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 { return uuid, true } } @@ -501,14 +524,7 @@ func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, if ks.ServiceType == "proxy" { continue } else if keepServiceIsMe(ks, hostname, listen) { - url := arvados.URL{ - Scheme: "http", - Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)), - } - if ks.ServiceSSLFlag { - url.Scheme = "https" - } - return ks.UUID, url, nil + return ks.UUID, keepServiceURL(ks), nil } else { tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort)) } @@ -517,6 +533,18 @@ func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, return } +func keepServiceURL(ks arvados.KeepService) arvados.URL { + url := arvados.URL{ + Scheme: "http", + Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)), + Path: "/", + } + if ks.ServiceSSLFlag { + url.Scheme = "https" + } + return url +} + var localhostOrAllInterfaces = map[string]bool{ "localhost": true, "127.0.0.1": true, @@ -552,3 +580,110 @@ func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) boo kshost := strings.ToLower(ks.ServiceHost) return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") } + +// Warn about pending keepstore migration tasks that haven't already +// been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated +// keepstore hosts other than the present host, and obsolete content +// in the keep_services table. +func (ldr *Loader) checkPendingKeepstoreMigrations(cluster *arvados.Cluster) error { + if cluster.Services.Controller.ExternalURL.String() == "" { + ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations") + return nil + } + if ldr.SkipAPICalls { + ldr.Logger.Debug("(Loader).SkipAPICalls == true -- skipping check for pending keepstore config migrations") + return nil + } + client, err := arvados.NewClientFromConfig(cluster) + if err != nil { + return err + } + client.AuthToken = cluster.SystemRootToken + var svcList arvados.KeepServiceList + err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil) + if err != nil { + ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations") + return nil + } + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("error getting hostname: %s", err) + } + sawTimes := map[time.Time]bool{} + for _, ks := range svcList.Items { + sawTimes[ks.CreatedAt] = true + sawTimes[ks.ModifiedAt] = true + } + if len(sawTimes) <= 1 { + // If all timestamps in the arvados/v1/keep_services + // response are identical, it's a clear sign the + // response was generated on the fly from the cluster + // config, rather than real database records. In that + // case (as well as the case where none are listed at + // all) it's pointless to look for entries that + // haven't yet been migrated to the config file. + return nil + } + needDBRows := false + for _, ks := range svcList.Items { + if ks.ServiceType == "proxy" { + if len(cluster.Services.Keepproxy.InternalURLs) == 0 { + needDBRows = true + ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file") + } + continue + } + kshost := strings.ToLower(ks.ServiceHost) + if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") { + // it would be confusing to recommend + // migrating *this* host's legacy keepstore + // config immediately after explaining that + // very migration process in more detail. + continue + } + ksurl := keepServiceURL(ks) + if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok { + // already added to InternalURLs + continue + } + ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost) + } + if !needDBRows { + ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead") + } + return nil +} + +// Warn about keepstore servers that have no volumes. +func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error { +servers: + for url := range cluster.Services.Keepstore.InternalURLs { + for _, vol := range cluster.Volumes { + if len(vol.AccessViaHosts) == 0 { + // accessible by all servers + return nil + } + if _, ok := vol.AccessViaHosts[url]; ok { + continue servers + } + } + ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url) + } + return nil +} + +// Warn about AccessViaHosts entries that don't correspond to any of +// the listed keepstore services. +func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error { + for uuid, vol := range cluster.Volumes { + if uuid == "SAMPLE" { + continue + } + for url := range vol.AccessViaHosts { + if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok { + ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url) + } + } + } + return nil +}