// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package config import ( "bufio" "bytes" "crypto/rand" "encoding/json" "fmt" "io/ioutil" "math/big" "net" "os" "strconv" "strings" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" ) const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml" type oldKeepstoreConfig struct { Debug *bool Listen *string LogFormat *string PIDFile *string MaxBuffers *int MaxRequests *int BlobSignatureTTL *arvados.Duration BlobSigningKeyFile *string RequireSignatures *bool SystemAuthTokenFile *string EnableDelete *bool TrashLifetime *arvados.Duration TrashCheckInterval *arvados.Duration PullWorkers *int TrashWorkers *int EmptyTrashWorkers *int TLSCertificateFile *string TLSKeyFile *string Volumes *oldKeepstoreVolumeList ManagementToken *string DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests } type oldKeepstoreVolumeList []oldKeepstoreVolume type oldKeepstoreVolume struct { arvados.Volume Type string `json:",omitempty"` // Azure driver configs StorageAccountName string `json:",omitempty"` StorageAccountKeyFile string `json:",omitempty"` StorageBaseURL string `json:",omitempty"` ContainerName string `json:",omitempty"` AzureReplication int `json:",omitempty"` RequestTimeout arvados.Duration `json:",omitempty"` ListBlobsRetryDelay arvados.Duration `json:",omitempty"` ListBlobsMaxAttempts int `json:",omitempty"` // S3 driver configs AccessKeyFile string `json:",omitempty"` SecretKeyFile string `json:",omitempty"` Endpoint string `json:",omitempty"` Region string `json:",omitempty"` Bucket string `json:",omitempty"` LocationConstraint bool `json:",omitempty"` IndexPageSize int `json:",omitempty"` S3Replication int `json:",omitempty"` ConnectTimeout arvados.Duration `json:",omitempty"` ReadTimeout arvados.Duration `json:",omitempty"` RaceWindow arvados.Duration `json:",omitempty"` UnsafeDelete bool `json:",omitempty"` // Directory driver configs Root string DirectoryReplication int Serialize bool // Common configs ReadOnly bool `json:",omitempty"` StorageClasses []string `json:",omitempty"` } // update config using values from an old-style keepstore config file. func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { if ldr.KeepstorePath == "" { return nil } hostname, err := os.Hostname() if err != nil { return fmt.Errorf("getting hostname: %s", err) } var oc oldKeepstoreConfig err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc) if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath { return nil } else if err != nil { return err } cluster, err := cfg.GetCluster("") if err != nil { return err } myURL := arvados.URL{Scheme: "http", Path: "/"} if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil { myURL.Scheme = "https" } if v := oc.Debug; v == nil { } else if *v && cluster.SystemLogs.LogLevel != "debug" { cluster.SystemLogs.LogLevel = "debug" } else if !*v && cluster.SystemLogs.LogLevel != "info" { cluster.SystemLogs.LogLevel = "info" } if v := oc.TLSCertificateFile; v != nil { cluster.TLS.Certificate = "file://" + *v } if v := oc.TLSKeyFile; v != nil { cluster.TLS.Key = "file://" + *v } if v := oc.Listen; v != nil { if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v, Path: "/"}]; ok { // already listed myURL.Host = *v } else if len(*v) > 1 && (*v)[0] == ':' { myURL.Host = net.JoinHostPort(hostname, (*v)[1:]) cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{} } else { return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v) } } else { for url := range cluster.Services.Keepstore.InternalURLs { if host, _, _ := net.SplitHostPort(url.Host); host == hostname { myURL = url break } } if myURL.Host == "" { return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname) } } if v := oc.LogFormat; v != nil { cluster.SystemLogs.Format = *v } if v := oc.MaxBuffers; v != nil { cluster.API.MaxKeepBlobBuffers = *v } if v := oc.MaxRequests; v != nil { cluster.API.MaxConcurrentRequests = *v } if v := oc.BlobSignatureTTL; v != nil { cluster.Collections.BlobSigningTTL = *v } if v := oc.BlobSigningKeyFile; v != nil { buf, err := ioutil.ReadFile(*v) if err != nil { return fmt.Errorf("error reading BlobSigningKeyFile: %s", err) } if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey { cluster.Collections.BlobSigningKey = key } } if v := oc.RequireSignatures; v != nil { cluster.Collections.BlobSigning = *v } if v := oc.SystemAuthTokenFile; v != nil { f, err := os.Open(*v) if err != nil { return fmt.Errorf("error opening SystemAuthTokenFile: %s", err) } defer f.Close() buf, err := ioutil.ReadAll(f) if err != nil { return fmt.Errorf("error reading SystemAuthTokenFile: %s", err) } if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken { cluster.SystemRootToken = key } } if v := oc.EnableDelete; v != nil { cluster.Collections.BlobTrash = *v } if v := oc.TrashLifetime; v != nil { cluster.Collections.BlobTrashLifetime = *v } if v := oc.TrashCheckInterval; v != nil { cluster.Collections.BlobTrashCheckInterval = *v } if v := oc.TrashWorkers; v != nil { cluster.Collections.BlobTrashConcurrency = *v } if v := oc.EmptyTrashWorkers; v != nil { cluster.Collections.BlobDeleteConcurrency = *v } if v := oc.PullWorkers; v != nil { cluster.Collections.BlobReplicateConcurrency = *v } if oc.Volumes == nil || len(*oc.Volumes) == 0 { ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes") err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL) if err != nil { return fmt.Errorf("error discovering local directory volumes: %s", err) } } else { err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL) if err != nil { return err } } if err := ldr.checkPendingKeepstoreMigrations(cluster); err != nil { return err } cfg.Clusters[cluster.ClusterID] = *cluster return nil } // Merge Volumes section of old keepstore config into cluster config. func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error { for i, oldvol := range *oc.Volumes { var accessViaHosts map[arvados.URL]arvados.VolumeAccess oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL) if found { accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts writers := false for _, va := range accessViaHosts { if !va.ReadOnly { writers = true } } if writers || len(accessViaHosts) == 0 { ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i) if len(accessViaHosts) > 0 { cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} } continue } } var newvol arvados.Volume if found { ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i) newvol = cluster.Volumes[oldUUID] // Remove the old entry. It will be added back // below, possibly with a new UUID. delete(cluster.Volumes, oldUUID) } else { v, err := ldr.translateOldKeepstoreVolume(oldvol) if err != nil { return err } newvol = v } if accessViaHosts == nil { accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1) } accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly} newvol.AccessViaHosts = accessViaHosts volUUID := oldUUID if oldvol.ReadOnly { } else if oc.Listen == nil { ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config") } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil { ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config") } else if len(uuid) != 27 { ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format") } else { rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:] if _, ok := cluster.Volumes[rendezvousUUID]; ok { ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use") } else { volUUID = rendezvousUUID } si := cluster.Services.Keepstore.InternalURLs[myURL] si.Rendezvous = uuid[12:] cluster.Services.Keepstore.InternalURLs[myURL] = si } if volUUID == "" { volUUID = newUUID(cluster.ClusterID, "nyw5e") ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i) } cluster.Volumes[volUUID] = newvol } return nil } func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) { var newvol arvados.Volume var params interface{} switch oldvol.Type { case "S3": accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile) if err != nil && oldvol.AccessKeyFile != "" { return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err) } secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile) if err != nil && oldvol.SecretKeyFile != "" { return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err) } newvol = arvados.Volume{ Driver: "S3", ReadOnly: oldvol.ReadOnly, Replication: oldvol.S3Replication, StorageClasses: array2boolmap(oldvol.StorageClasses), } params = arvados.S3VolumeDriverParameters{ AccessKeyID: string(bytes.TrimSpace(accesskeydata)), SecretAccessKey: string(bytes.TrimSpace(secretkeydata)), Endpoint: oldvol.Endpoint, Region: oldvol.Region, Bucket: oldvol.Bucket, LocationConstraint: oldvol.LocationConstraint, IndexPageSize: oldvol.IndexPageSize, ConnectTimeout: oldvol.ConnectTimeout, ReadTimeout: oldvol.ReadTimeout, RaceWindow: oldvol.RaceWindow, UnsafeDelete: oldvol.UnsafeDelete, } case "Azure": keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile) if err != nil && oldvol.StorageAccountKeyFile != "" { return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err) } newvol = arvados.Volume{ Driver: "Azure", ReadOnly: oldvol.ReadOnly, Replication: oldvol.AzureReplication, StorageClasses: array2boolmap(oldvol.StorageClasses), } params = arvados.AzureVolumeDriverParameters{ StorageAccountName: oldvol.StorageAccountName, StorageAccountKey: string(bytes.TrimSpace(keydata)), StorageBaseURL: oldvol.StorageBaseURL, ContainerName: oldvol.ContainerName, RequestTimeout: oldvol.RequestTimeout, ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay, ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts, } case "Directory": newvol = arvados.Volume{ Driver: "Directory", ReadOnly: oldvol.ReadOnly, Replication: oldvol.DirectoryReplication, StorageClasses: array2boolmap(oldvol.StorageClasses), } params = arvados.DirectoryVolumeDriverParameters{ Root: oldvol.Root, Serialize: oldvol.Serialize, } default: return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type) } dp, err := json.Marshal(params) if err != nil { return newvol, err } newvol.DriverParameters = json.RawMessage(dp) if newvol.Replication < 1 { newvol.Replication = 1 } return newvol, nil } func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) { for uuid, newvol := range newvols { if oldvol.Type != newvol.Driver { continue } switch oldvol.Type { case "S3": var params arvados.S3VolumeDriverParameters if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil && oldvol.Endpoint == params.Endpoint && oldvol.Region == params.Region && oldvol.Bucket == params.Bucket && oldvol.LocationConstraint == params.LocationConstraint { return uuid, true } case "Azure": var params arvados.AzureVolumeDriverParameters if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil && oldvol.StorageAccountName == params.StorageAccountName && oldvol.StorageBaseURL == params.StorageBaseURL && oldvol.ContainerName == params.ContainerName { return uuid, true } case "Directory": var params arvados.DirectoryVolumeDriverParameters if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil && oldvol.Root == params.Root { if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 { return uuid, true } } } } return "", false } func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error { if mountsFile == "" { mountsFile = "/proc/mounts" } f, err := os.Open(mountsFile) if err != nil { return fmt.Errorf("error opening %s: %s", mountsFile, err) } defer f.Close() scanner := bufio.NewScanner(f) for scanner.Scan() { args := strings.Fields(scanner.Text()) dev, mount := args[0], args[1] if mount == "/" { continue } if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") { continue } keepdir := mount + "/keep" if st, err := os.Stat(keepdir); err != nil || !st.IsDir() { continue } ro := false for _, fsopt := range strings.Split(args[3], ",") { if fsopt == "ro" { ro = true } } uuid := newUUID(cluster.ClusterID, "nyw5e") ldr.Logger.WithFields(logrus.Fields{ "UUID": uuid, "Driver": "Directory", "DriverParameters.Root": keepdir, "DriverParameters.Serialize": false, "ReadOnly": ro, "Replication": 1, }).Warn("adding local directory volume") p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{ Root: keepdir, Serialize: false, }) if err != nil { panic(err) } cluster.Volumes[uuid] = arvados.Volume{ Driver: "Directory", DriverParameters: p, ReadOnly: ro, Replication: 1, AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{ myURL: {ReadOnly: ro}, }, } } if err := scanner.Err(); err != nil { return fmt.Errorf("reading %s: %s", mountsFile, err) } return nil } func array2boolmap(keys []string) map[string]bool { m := map[string]bool{} for _, k := range keys { m[k] = true } return m } func newUUID(clusterID, infix string) string { randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0))) if err != nil { panic(err) } randstr := randint.Text(36) for len(randstr) < 15 { randstr = "0" + randstr } return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr) } // Return the UUID and URL for the controller's keep_services listing // corresponding to this host/process. func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) { client, err := arvados.NewClientFromConfig(cluster) if err != nil { return } client.AuthToken = cluster.SystemRootToken var svcList arvados.KeepServiceList err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil) if err != nil { return } hostname, err := os.Hostname() if err != nil { err = fmt.Errorf("error getting hostname: %s", err) return } var tried []string for _, ks := range svcList.Items { if ks.ServiceType == "proxy" { continue } else if keepServiceIsMe(ks, hostname, listen) { return ks.UUID, keepServiceURL(ks), nil } else { tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort)) } } err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried) return } func keepServiceURL(ks arvados.KeepService) arvados.URL { url := arvados.URL{ Scheme: "http", Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)), Path: "/", } if ks.ServiceSSLFlag { url.Scheme = "https" } return url } var localhostOrAllInterfaces = map[string]bool{ "localhost": true, "127.0.0.1": true, "::1": true, "::": true, "": true, } // Return true if the given KeepService entry matches the given // hostname and (keepstore config file) listen address. // // If the KeepService host is some variant of "localhost", we assume // this is a testing or single-node environment, ignore the given // hostname, and return true if the port numbers match. // // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will // match a KeepService host "foo.bar", but also "foo.bar.example", // "foo.bar.example.org", etc. func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool { // Extract the port name/number from listen, and resolve it to // a port number to compare with ks.ServicePort. _, listenport, err := net.SplitHostPort(listen) if err != nil && strings.HasPrefix(listen, ":") { listenport = listen[1:] } if lp, err := net.LookupPort("tcp", listenport); err != nil { return false } else if !(lp == ks.ServicePort || (lp == 0 && ks.ServicePort == 80)) { return false } kshost := strings.ToLower(ks.ServiceHost) return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") } // Warn about pending keepstore migration tasks that haven't already // been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated // keepstore hosts other than the present host, and obsolete content // in the keep_services table. func (ldr *Loader) checkPendingKeepstoreMigrations(cluster *arvados.Cluster) error { if cluster.Services.Controller.ExternalURL.String() == "" { ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations") return nil } if ldr.SkipAPICalls { ldr.Logger.Debug("(Loader).SkipAPICalls == true -- skipping check for pending keepstore config migrations") return nil } client, err := arvados.NewClientFromConfig(cluster) if err != nil { return err } client.AuthToken = cluster.SystemRootToken var svcList arvados.KeepServiceList err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil) if err != nil { ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations") return nil } hostname, err := os.Hostname() if err != nil { return fmt.Errorf("error getting hostname: %s", err) } sawTimes := map[time.Time]bool{} for _, ks := range svcList.Items { sawTimes[ks.CreatedAt] = true sawTimes[ks.ModifiedAt] = true } if len(sawTimes) <= 1 { // If all timestamps in the arvados/v1/keep_services // response are identical, it's a clear sign the // response was generated on the fly from the cluster // config, rather than real database records. In that // case (as well as the case where none are listed at // all) it's pointless to look for entries that // haven't yet been migrated to the config file. return nil } needDBRows := false for _, ks := range svcList.Items { if ks.ServiceType == "proxy" { if len(cluster.Services.Keepproxy.InternalURLs) == 0 { needDBRows = true ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file") } continue } kshost := strings.ToLower(ks.ServiceHost) if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") { // it would be confusing to recommend // migrating *this* host's legacy keepstore // config immediately after explaining that // very migration process in more detail. continue } ksurl := keepServiceURL(ks) if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok { // already added to InternalURLs continue } ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost) } if !needDBRows { ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead") } return nil } // Warn about keepstore servers that have no volumes. func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error { servers: for url := range cluster.Services.Keepstore.InternalURLs { for _, vol := range cluster.Volumes { if len(vol.AccessViaHosts) == 0 { // accessible by all servers return nil } if _, ok := vol.AccessViaHosts[url]; ok { continue servers } } ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url) } return nil } // Warn about AccessViaHosts entries that don't correspond to any of // the listed keepstore services. func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error { for uuid, vol := range cluster.Volumes { if uuid == "SAMPLE" { continue } for url := range vol.AccessViaHosts { if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok { ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url) } } } return nil }