1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
25 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
27 type oldKeepstoreConfig struct {
38 BlobSignatureTTL *arvados.Duration
39 BlobSigningKeyFile *string
40 RequireSignatures *bool
41 SystemAuthTokenFile *string
43 TrashLifetime *arvados.Duration
44 TrashCheckInterval *arvados.Duration
47 EmptyTrashWorkers *int
48 TLSCertificateFile *string
51 Volumes *oldKeepstoreVolumeList
53 ManagementToken *string
55 DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
58 type oldKeepstoreVolumeList []oldKeepstoreVolume
60 type oldKeepstoreVolume struct {
62 Type string `json:",omitempty"`
64 // Azure driver configs
65 StorageAccountName string `json:",omitempty"`
66 StorageAccountKeyFile string `json:",omitempty"`
67 StorageBaseURL string `json:",omitempty"`
68 ContainerName string `json:",omitempty"`
69 AzureReplication int `json:",omitempty"`
70 RequestTimeout arvados.Duration `json:",omitempty"`
71 ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
72 ListBlobsMaxAttempts int `json:",omitempty"`
75 AccessKeyFile string `json:",omitempty"`
76 SecretKeyFile string `json:",omitempty"`
77 Endpoint string `json:",omitempty"`
78 Region string `json:",omitempty"`
79 Bucket string `json:",omitempty"`
80 LocationConstraint bool `json:",omitempty"`
81 IndexPageSize int `json:",omitempty"`
82 S3Replication int `json:",omitempty"`
83 ConnectTimeout arvados.Duration `json:",omitempty"`
84 ReadTimeout arvados.Duration `json:",omitempty"`
85 RaceWindow arvados.Duration `json:",omitempty"`
86 UnsafeDelete bool `json:",omitempty"`
88 // Directory driver configs
90 DirectoryReplication int
94 ReadOnly bool `json:",omitempty"`
95 StorageClasses []string `json:",omitempty"`
98 // update config using values from an old-style keepstore config file.
99 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
100 if ldr.KeepstorePath == "" {
103 hostname, err := os.Hostname()
105 return fmt.Errorf("getting hostname: %s", err)
108 var oc oldKeepstoreConfig
109 err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
110 if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath {
112 } else if err != nil {
116 cluster, err := cfg.GetCluster("")
121 myURL := arvados.URL{Scheme: "http"}
122 if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
123 myURL.Scheme = "https"
126 if v := oc.Debug; v == nil {
127 } else if *v && cluster.SystemLogs.LogLevel != "debug" {
128 cluster.SystemLogs.LogLevel = "debug"
129 } else if !*v && cluster.SystemLogs.LogLevel != "info" {
130 cluster.SystemLogs.LogLevel = "info"
133 if v := oc.TLSCertificateFile; v != nil {
134 cluster.TLS.Certificate = "file://" + *v
136 if v := oc.TLSKeyFile; v != nil {
137 cluster.TLS.Key = "file://" + *v
139 if v := oc.Listen; v != nil {
140 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
143 } else if len(*v) > 1 && (*v)[0] == ':' {
144 myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
145 cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
147 return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v)
150 for url := range cluster.Services.Keepstore.InternalURLs {
151 if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
156 if myURL.Host == "" {
157 return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
161 if v := oc.LogFormat; v != nil {
162 cluster.SystemLogs.Format = *v
164 if v := oc.MaxBuffers; v != nil {
165 cluster.API.MaxKeepBlobBuffers = *v
167 if v := oc.MaxRequests; v != nil {
168 cluster.API.MaxConcurrentRequests = *v
170 if v := oc.BlobSignatureTTL; v != nil {
171 cluster.Collections.BlobSigningTTL = *v
173 if v := oc.BlobSigningKeyFile; v != nil {
174 buf, err := ioutil.ReadFile(*v)
176 return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
178 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
179 cluster.Collections.BlobSigningKey = key
182 if v := oc.RequireSignatures; v != nil {
183 cluster.Collections.BlobSigning = *v
185 if v := oc.SystemAuthTokenFile; v != nil {
186 f, err := os.Open(*v)
188 return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
191 buf, err := ioutil.ReadAll(f)
193 return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
195 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
196 cluster.SystemRootToken = key
199 if v := oc.EnableDelete; v != nil {
200 cluster.Collections.BlobTrash = *v
202 if v := oc.TrashLifetime; v != nil {
203 cluster.Collections.BlobTrashLifetime = *v
205 if v := oc.TrashCheckInterval; v != nil {
206 cluster.Collections.BlobTrashCheckInterval = *v
208 if v := oc.TrashWorkers; v != nil {
209 cluster.Collections.BlobTrashConcurrency = *v
211 if v := oc.EmptyTrashWorkers; v != nil {
212 cluster.Collections.BlobDeleteConcurrency = *v
214 if v := oc.PullWorkers; v != nil {
215 cluster.Collections.BlobReplicateConcurrency = *v
217 if oc.Volumes == nil || len(*oc.Volumes) == 0 {
218 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
219 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
221 return fmt.Errorf("error discovering local directory volumes: %s", err)
224 err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
230 cfg.Clusters[cluster.ClusterID] = *cluster
234 // Merge Volumes section of old keepstore config into cluster config.
235 func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
236 for i, oldvol := range *oc.Volumes {
237 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
238 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
240 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
242 for _, va := range accessViaHosts {
247 if writers || len(accessViaHosts) == 0 {
248 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
249 if len(accessViaHosts) > 0 {
250 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
255 var newvol arvados.Volume
257 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
258 newvol = cluster.Volumes[oldUUID]
259 // Remove the old entry. It will be added back
260 // below, possibly with a new UUID.
261 delete(cluster.Volumes, oldUUID)
263 v, err := ldr.translateOldKeepstoreVolume(oldvol)
269 if accessViaHosts == nil {
270 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
272 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
273 newvol.AccessViaHosts = accessViaHosts
277 } else if oc.Listen == nil {
278 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
279 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
280 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
281 } else if len(uuid) != 27 {
282 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
284 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
285 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
286 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
288 volUUID = rendezvousUUID
290 si := cluster.Services.Keepstore.InternalURLs[myURL]
291 si.Rendezvous = uuid[12:]
292 cluster.Services.Keepstore.InternalURLs[myURL] = si
295 volUUID = newUUID(cluster.ClusterID, "nyw5e")
296 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
298 cluster.Volumes[volUUID] = newvol
303 func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
304 var newvol arvados.Volume
305 var params interface{}
308 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
309 if err != nil && oldvol.AccessKeyFile != "" {
310 return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
312 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
313 if err != nil && oldvol.SecretKeyFile != "" {
314 return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
316 newvol = arvados.Volume{
318 ReadOnly: oldvol.ReadOnly,
319 Replication: oldvol.S3Replication,
320 StorageClasses: array2boolmap(oldvol.StorageClasses),
322 params = arvados.S3VolumeDriverParameters{
323 AccessKey: string(bytes.TrimSpace(accesskeydata)),
324 SecretKey: string(bytes.TrimSpace(secretkeydata)),
325 Endpoint: oldvol.Endpoint,
326 Region: oldvol.Region,
327 Bucket: oldvol.Bucket,
328 LocationConstraint: oldvol.LocationConstraint,
329 IndexPageSize: oldvol.IndexPageSize,
330 ConnectTimeout: oldvol.ConnectTimeout,
331 ReadTimeout: oldvol.ReadTimeout,
332 RaceWindow: oldvol.RaceWindow,
333 UnsafeDelete: oldvol.UnsafeDelete,
336 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
337 if err != nil && oldvol.StorageAccountKeyFile != "" {
338 return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
340 newvol = arvados.Volume{
342 ReadOnly: oldvol.ReadOnly,
343 Replication: oldvol.AzureReplication,
344 StorageClasses: array2boolmap(oldvol.StorageClasses),
346 params = arvados.AzureVolumeDriverParameters{
347 StorageAccountName: oldvol.StorageAccountName,
348 StorageAccountKey: string(bytes.TrimSpace(keydata)),
349 StorageBaseURL: oldvol.StorageBaseURL,
350 ContainerName: oldvol.ContainerName,
351 RequestTimeout: oldvol.RequestTimeout,
352 ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
353 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
356 newvol = arvados.Volume{
358 ReadOnly: oldvol.ReadOnly,
359 Replication: oldvol.DirectoryReplication,
360 StorageClasses: array2boolmap(oldvol.StorageClasses),
362 params = arvados.DirectoryVolumeDriverParameters{
364 Serialize: oldvol.Serialize,
367 return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
369 dp, err := json.Marshal(params)
373 newvol.DriverParameters = json.RawMessage(dp)
374 if newvol.Replication < 1 {
375 newvol.Replication = 1
380 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
381 for uuid, newvol := range newvols {
382 if oldvol.Type != newvol.Driver {
387 var params arvados.S3VolumeDriverParameters
388 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
389 oldvol.Endpoint == params.Endpoint &&
390 oldvol.Region == params.Region &&
391 oldvol.Bucket == params.Bucket &&
392 oldvol.LocationConstraint == params.LocationConstraint {
396 var params arvados.AzureVolumeDriverParameters
397 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
398 oldvol.StorageAccountName == params.StorageAccountName &&
399 oldvol.StorageBaseURL == params.StorageBaseURL &&
400 oldvol.ContainerName == params.ContainerName {
404 var params arvados.DirectoryVolumeDriverParameters
405 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
406 oldvol.Root == params.Root {
407 if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 {
416 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
417 if mountsFile == "" {
418 mountsFile = "/proc/mounts"
420 f, err := os.Open(mountsFile)
422 return fmt.Errorf("error opening %s: %s", mountsFile, err)
425 scanner := bufio.NewScanner(f)
427 args := strings.Fields(scanner.Text())
428 dev, mount := args[0], args[1]
432 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
435 keepdir := mount + "/keep"
436 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
441 for _, fsopt := range strings.Split(args[3], ",") {
447 uuid := newUUID(cluster.ClusterID, "nyw5e")
448 ldr.Logger.WithFields(logrus.Fields{
450 "Driver": "Directory",
451 "DriverParameters.Root": keepdir,
452 "DriverParameters.Serialize": false,
455 }).Warn("adding local directory volume")
457 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
464 cluster.Volumes[uuid] = arvados.Volume{
469 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
470 myURL: {ReadOnly: ro},
474 if err := scanner.Err(); err != nil {
475 return fmt.Errorf("reading %s: %s", mountsFile, err)
480 func array2boolmap(keys []string) map[string]bool {
481 m := map[string]bool{}
482 for _, k := range keys {
488 func newUUID(clusterID, infix string) string {
489 randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
493 randstr := randint.Text(36)
494 for len(randstr) < 15 {
495 randstr = "0" + randstr
497 return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
500 // Return the UUID and URL for the controller's keep_services listing
501 // corresponding to this host/process.
502 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
503 client, err := arvados.NewClientFromConfig(cluster)
507 client.AuthToken = cluster.SystemRootToken
508 var svcList arvados.KeepServiceList
509 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
513 hostname, err := os.Hostname()
515 err = fmt.Errorf("error getting hostname: %s", err)
519 for _, ks := range svcList.Items {
520 if ks.ServiceType == "proxy" {
522 } else if keepServiceIsMe(ks, hostname, listen) {
523 return ks.UUID, keepServiceURL(ks), nil
525 tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
528 err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
532 func keepServiceURL(ks arvados.KeepService) arvados.URL {
535 Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
537 if ks.ServiceSSLFlag {
543 var localhostOrAllInterfaces = map[string]bool{
551 // Return true if the given KeepService entry matches the given
552 // hostname and (keepstore config file) listen address.
554 // If the KeepService host is some variant of "localhost", we assume
555 // this is a testing or single-node environment, ignore the given
556 // hostname, and return true if the port numbers match.
558 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
559 // match a KeepService host "foo.bar", but also "foo.bar.example",
560 // "foo.bar.example.org", etc.
561 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
562 // Extract the port name/number from listen, and resolve it to
563 // a port number to compare with ks.ServicePort.
564 _, listenport, err := net.SplitHostPort(listen)
565 if err != nil && strings.HasPrefix(listen, ":") {
566 listenport = listen[1:]
568 if lp, err := net.LookupPort("tcp", listenport); err != nil {
570 } else if !(lp == ks.ServicePort ||
571 (lp == 0 && ks.ServicePort == 80)) {
575 kshost := strings.ToLower(ks.ServiceHost)
576 return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
579 // Warn about pending keepstore migration tasks that haven't already
580 // been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated
581 // keepstore hosts other than the present host, and obsolete content
582 // in the keep_services table.
583 func (ldr *Loader) checkPendingKeepstoreMigrations(cluster arvados.Cluster) error {
584 if cluster.Services.Controller.ExternalURL.String() == "" {
585 ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations")
588 client, err := arvados.NewClientFromConfig(&cluster)
592 client.AuthToken = cluster.SystemRootToken
593 var svcList arvados.KeepServiceList
594 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
596 ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations")
599 hostname, err := os.Hostname()
601 return fmt.Errorf("error getting hostname: %s", err)
603 sawTimes := map[time.Time]bool{}
604 for _, ks := range svcList.Items {
605 sawTimes[ks.CreatedAt] = true
606 sawTimes[ks.ModifiedAt] = true
608 if len(sawTimes) <= 1 {
609 // If all timestamps in the arvados/v1/keep_services
610 // response are identical, it's a clear sign the
611 // response was generated on the fly from the cluster
612 // config, rather than real database records. In that
613 // case (as well as the case where none are listed at
614 // all) it's pointless to look for entries that
615 // haven't yet been migrated to the config file.
619 for _, ks := range svcList.Items {
620 if ks.ServiceType == "proxy" {
621 if len(cluster.Services.Keepproxy.InternalURLs) == 0 {
623 ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file")
627 kshost := strings.ToLower(ks.ServiceHost)
628 if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") {
629 // it would be confusing to recommend
630 // migrating *this* host's legacy keepstore
631 // config immediately after explaining that
632 // very migration process in more detail.
635 ksurl := keepServiceURL(ks)
636 if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok {
637 // already added to InternalURLs
640 ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost)
643 ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead")
648 // Warn about keepstore servers that have no volumes.
649 func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error {
651 for url := range cluster.Services.Keepstore.InternalURLs {
652 for _, vol := range cluster.Volumes {
653 if len(vol.AccessViaHosts) == 0 {
654 // accessible by all servers
657 if _, ok := vol.AccessViaHosts[url]; ok {
661 ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url)
666 // Warn about AccessViaHosts entries that don't correspond to any of
667 // the listed keepstore services.
668 func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error {
669 for uuid, vol := range cluster.Volumes {
670 if uuid == "SAMPLE" {
673 for url := range vol.AccessViaHosts {
674 if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok {
675 ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url)