1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
25 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
27 type oldKeepstoreConfig struct {
38 BlobSignatureTTL *arvados.Duration
39 BlobSigningKeyFile *string
40 RequireSignatures *bool
41 SystemAuthTokenFile *string
43 TrashLifetime *arvados.Duration
44 TrashCheckInterval *arvados.Duration
47 EmptyTrashWorkers *int
48 TLSCertificateFile *string
51 Volumes *oldKeepstoreVolumeList
53 ManagementToken *string
55 DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
58 type oldKeepstoreVolumeList []oldKeepstoreVolume
60 type oldKeepstoreVolume struct {
62 Type string `json:",omitempty"`
64 // Azure driver configs
65 StorageAccountName string `json:",omitempty"`
66 StorageAccountKeyFile string `json:",omitempty"`
67 StorageBaseURL string `json:",omitempty"`
68 ContainerName string `json:",omitempty"`
69 AzureReplication int `json:",omitempty"`
70 RequestTimeout arvados.Duration `json:",omitempty"`
71 ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
72 ListBlobsMaxAttempts int `json:",omitempty"`
75 AccessKeyFile string `json:",omitempty"`
76 SecretKeyFile string `json:",omitempty"`
77 Endpoint string `json:",omitempty"`
78 Region string `json:",omitempty"`
79 Bucket string `json:",omitempty"`
80 LocationConstraint bool `json:",omitempty"`
81 IndexPageSize int `json:",omitempty"`
82 S3Replication int `json:",omitempty"`
83 ConnectTimeout arvados.Duration `json:",omitempty"`
84 ReadTimeout arvados.Duration `json:",omitempty"`
85 RaceWindow arvados.Duration `json:",omitempty"`
86 UnsafeDelete bool `json:",omitempty"`
88 // Directory driver configs
90 DirectoryReplication int
94 ReadOnly bool `json:",omitempty"`
95 StorageClasses []string `json:",omitempty"`
98 // update config using values from an old-style keepstore config file.
99 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
100 if ldr.KeepstorePath == "" {
103 hostname, err := os.Hostname()
105 return fmt.Errorf("getting hostname: %s", err)
108 var oc oldKeepstoreConfig
109 err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
110 if os.IsNotExist(err) && ldr.KeepstorePath == defaultKeepstoreConfigPath {
112 } else if err != nil {
116 cluster, err := cfg.GetCluster("")
121 myURL := arvados.URL{Scheme: "http"}
122 if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
123 myURL.Scheme = "https"
126 if v := oc.Debug; v == nil {
127 } else if *v && cluster.SystemLogs.LogLevel != "debug" {
128 cluster.SystemLogs.LogLevel = "debug"
129 } else if !*v && cluster.SystemLogs.LogLevel != "info" {
130 cluster.SystemLogs.LogLevel = "info"
133 if v := oc.TLSCertificateFile; v != nil {
134 cluster.TLS.Certificate = "file://" + *v
136 if v := oc.TLSKeyFile; v != nil {
137 cluster.TLS.Key = "file://" + *v
139 if v := oc.Listen; v != nil {
140 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
143 } else if len(*v) > 1 && (*v)[0] == ':' {
144 myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
145 cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
147 return fmt.Errorf("unable to migrate Listen value %q -- you must update Services.Keepstore.InternalURLs manually, and comment out the Listen entry in your legacy keepstore config file", *v)
150 for url := range cluster.Services.Keepstore.InternalURLs {
151 if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
156 if myURL.Host == "" {
157 return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
161 if v := oc.LogFormat; v != nil {
162 cluster.SystemLogs.Format = *v
164 if v := oc.MaxBuffers; v != nil {
165 cluster.API.MaxKeepBlobBuffers = *v
167 if v := oc.MaxRequests; v != nil {
168 cluster.API.MaxConcurrentRequests = *v
170 if v := oc.BlobSignatureTTL; v != nil {
171 cluster.Collections.BlobSigningTTL = *v
173 if v := oc.BlobSigningKeyFile; v != nil {
174 buf, err := ioutil.ReadFile(*v)
176 return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
178 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
179 cluster.Collections.BlobSigningKey = key
182 if v := oc.RequireSignatures; v != nil {
183 cluster.Collections.BlobSigning = *v
185 if v := oc.SystemAuthTokenFile; v != nil {
186 f, err := os.Open(*v)
188 return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
191 buf, err := ioutil.ReadAll(f)
193 return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
195 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
196 cluster.SystemRootToken = key
199 if v := oc.EnableDelete; v != nil {
200 cluster.Collections.BlobTrash = *v
202 if v := oc.TrashLifetime; v != nil {
203 cluster.Collections.BlobTrashLifetime = *v
205 if v := oc.TrashCheckInterval; v != nil {
206 cluster.Collections.BlobTrashCheckInterval = *v
208 if v := oc.TrashWorkers; v != nil {
209 cluster.Collections.BlobTrashConcurrency = *v
211 if v := oc.EmptyTrashWorkers; v != nil {
212 cluster.Collections.BlobDeleteConcurrency = *v
214 if v := oc.PullWorkers; v != nil {
215 cluster.Collections.BlobReplicateConcurrency = *v
217 if oc.Volumes == nil || len(*oc.Volumes) == 0 {
218 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
219 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
221 return fmt.Errorf("error discovering local directory volumes: %s", err)
224 err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
230 if err := ldr.checkPendingKeepstoreMigrations(cluster); err != nil {
234 cfg.Clusters[cluster.ClusterID] = *cluster
238 // Merge Volumes section of old keepstore config into cluster config.
239 func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
240 for i, oldvol := range *oc.Volumes {
241 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
242 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
244 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
246 for _, va := range accessViaHosts {
251 if writers || len(accessViaHosts) == 0 {
252 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
253 if len(accessViaHosts) > 0 {
254 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
259 var newvol arvados.Volume
261 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
262 newvol = cluster.Volumes[oldUUID]
263 // Remove the old entry. It will be added back
264 // below, possibly with a new UUID.
265 delete(cluster.Volumes, oldUUID)
267 v, err := ldr.translateOldKeepstoreVolume(oldvol)
273 if accessViaHosts == nil {
274 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
276 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
277 newvol.AccessViaHosts = accessViaHosts
281 } else if oc.Listen == nil {
282 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
283 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
284 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
285 } else if len(uuid) != 27 {
286 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
288 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
289 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
290 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
292 volUUID = rendezvousUUID
294 si := cluster.Services.Keepstore.InternalURLs[myURL]
295 si.Rendezvous = uuid[12:]
296 cluster.Services.Keepstore.InternalURLs[myURL] = si
299 volUUID = newUUID(cluster.ClusterID, "nyw5e")
300 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
302 cluster.Volumes[volUUID] = newvol
307 func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
308 var newvol arvados.Volume
309 var params interface{}
312 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
313 if err != nil && oldvol.AccessKeyFile != "" {
314 return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
316 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
317 if err != nil && oldvol.SecretKeyFile != "" {
318 return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
320 newvol = arvados.Volume{
322 ReadOnly: oldvol.ReadOnly,
323 Replication: oldvol.S3Replication,
324 StorageClasses: array2boolmap(oldvol.StorageClasses),
326 params = arvados.S3VolumeDriverParameters{
327 AccessKey: string(bytes.TrimSpace(accesskeydata)),
328 SecretKey: string(bytes.TrimSpace(secretkeydata)),
329 Endpoint: oldvol.Endpoint,
330 Region: oldvol.Region,
331 Bucket: oldvol.Bucket,
332 LocationConstraint: oldvol.LocationConstraint,
333 IndexPageSize: oldvol.IndexPageSize,
334 ConnectTimeout: oldvol.ConnectTimeout,
335 ReadTimeout: oldvol.ReadTimeout,
336 RaceWindow: oldvol.RaceWindow,
337 UnsafeDelete: oldvol.UnsafeDelete,
340 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
341 if err != nil && oldvol.StorageAccountKeyFile != "" {
342 return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
344 newvol = arvados.Volume{
346 ReadOnly: oldvol.ReadOnly,
347 Replication: oldvol.AzureReplication,
348 StorageClasses: array2boolmap(oldvol.StorageClasses),
350 params = arvados.AzureVolumeDriverParameters{
351 StorageAccountName: oldvol.StorageAccountName,
352 StorageAccountKey: string(bytes.TrimSpace(keydata)),
353 StorageBaseURL: oldvol.StorageBaseURL,
354 ContainerName: oldvol.ContainerName,
355 RequestTimeout: oldvol.RequestTimeout,
356 ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
357 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
360 newvol = arvados.Volume{
362 ReadOnly: oldvol.ReadOnly,
363 Replication: oldvol.DirectoryReplication,
364 StorageClasses: array2boolmap(oldvol.StorageClasses),
366 params = arvados.DirectoryVolumeDriverParameters{
368 Serialize: oldvol.Serialize,
371 return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
373 dp, err := json.Marshal(params)
377 newvol.DriverParameters = json.RawMessage(dp)
378 if newvol.Replication < 1 {
379 newvol.Replication = 1
384 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
385 for uuid, newvol := range newvols {
386 if oldvol.Type != newvol.Driver {
391 var params arvados.S3VolumeDriverParameters
392 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
393 oldvol.Endpoint == params.Endpoint &&
394 oldvol.Region == params.Region &&
395 oldvol.Bucket == params.Bucket &&
396 oldvol.LocationConstraint == params.LocationConstraint {
400 var params arvados.AzureVolumeDriverParameters
401 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
402 oldvol.StorageAccountName == params.StorageAccountName &&
403 oldvol.StorageBaseURL == params.StorageBaseURL &&
404 oldvol.ContainerName == params.ContainerName {
408 var params arvados.DirectoryVolumeDriverParameters
409 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
410 oldvol.Root == params.Root {
411 if _, ok := newvol.AccessViaHosts[myURL]; ok || len(newvol.AccessViaHosts) == 0 {
420 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
421 if mountsFile == "" {
422 mountsFile = "/proc/mounts"
424 f, err := os.Open(mountsFile)
426 return fmt.Errorf("error opening %s: %s", mountsFile, err)
429 scanner := bufio.NewScanner(f)
431 args := strings.Fields(scanner.Text())
432 dev, mount := args[0], args[1]
436 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
439 keepdir := mount + "/keep"
440 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
445 for _, fsopt := range strings.Split(args[3], ",") {
451 uuid := newUUID(cluster.ClusterID, "nyw5e")
452 ldr.Logger.WithFields(logrus.Fields{
454 "Driver": "Directory",
455 "DriverParameters.Root": keepdir,
456 "DriverParameters.Serialize": false,
459 }).Warn("adding local directory volume")
461 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
468 cluster.Volumes[uuid] = arvados.Volume{
473 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
474 myURL: {ReadOnly: ro},
478 if err := scanner.Err(); err != nil {
479 return fmt.Errorf("reading %s: %s", mountsFile, err)
484 func array2boolmap(keys []string) map[string]bool {
485 m := map[string]bool{}
486 for _, k := range keys {
492 func newUUID(clusterID, infix string) string {
493 randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
497 randstr := randint.Text(36)
498 for len(randstr) < 15 {
499 randstr = "0" + randstr
501 return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
504 // Return the UUID and URL for the controller's keep_services listing
505 // corresponding to this host/process.
506 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
507 client, err := arvados.NewClientFromConfig(cluster)
511 client.AuthToken = cluster.SystemRootToken
512 var svcList arvados.KeepServiceList
513 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
517 hostname, err := os.Hostname()
519 err = fmt.Errorf("error getting hostname: %s", err)
523 for _, ks := range svcList.Items {
524 if ks.ServiceType == "proxy" {
526 } else if keepServiceIsMe(ks, hostname, listen) {
527 return ks.UUID, keepServiceURL(ks), nil
529 tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
532 err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
536 func keepServiceURL(ks arvados.KeepService) arvados.URL {
539 Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
541 if ks.ServiceSSLFlag {
547 var localhostOrAllInterfaces = map[string]bool{
555 // Return true if the given KeepService entry matches the given
556 // hostname and (keepstore config file) listen address.
558 // If the KeepService host is some variant of "localhost", we assume
559 // this is a testing or single-node environment, ignore the given
560 // hostname, and return true if the port numbers match.
562 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
563 // match a KeepService host "foo.bar", but also "foo.bar.example",
564 // "foo.bar.example.org", etc.
565 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
566 // Extract the port name/number from listen, and resolve it to
567 // a port number to compare with ks.ServicePort.
568 _, listenport, err := net.SplitHostPort(listen)
569 if err != nil && strings.HasPrefix(listen, ":") {
570 listenport = listen[1:]
572 if lp, err := net.LookupPort("tcp", listenport); err != nil {
574 } else if !(lp == ks.ServicePort ||
575 (lp == 0 && ks.ServicePort == 80)) {
579 kshost := strings.ToLower(ks.ServiceHost)
580 return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
583 // Warn about pending keepstore migration tasks that haven't already
584 // been warned about in loadOldKeepstoreConfig() -- i.e., unmigrated
585 // keepstore hosts other than the present host, and obsolete content
586 // in the keep_services table.
587 func (ldr *Loader) checkPendingKeepstoreMigrations(cluster *arvados.Cluster) error {
588 if cluster.Services.Controller.ExternalURL.String() == "" {
589 ldr.Logger.Debug("Services.Controller.ExternalURL not configured -- skipping check for pending keepstore config migrations")
592 if ldr.SkipAPICalls {
593 ldr.Logger.Debug("(Loader).SkipAPICalls == true -- skipping check for pending keepstore config migrations")
596 client, err := arvados.NewClientFromConfig(cluster)
600 client.AuthToken = cluster.SystemRootToken
601 var svcList arvados.KeepServiceList
602 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
604 ldr.Logger.WithError(err).Warn("error retrieving keep_services list -- skipping check for pending keepstore config migrations")
607 hostname, err := os.Hostname()
609 return fmt.Errorf("error getting hostname: %s", err)
611 sawTimes := map[time.Time]bool{}
612 for _, ks := range svcList.Items {
613 sawTimes[ks.CreatedAt] = true
614 sawTimes[ks.ModifiedAt] = true
616 if len(sawTimes) <= 1 {
617 // If all timestamps in the arvados/v1/keep_services
618 // response are identical, it's a clear sign the
619 // response was generated on the fly from the cluster
620 // config, rather than real database records. In that
621 // case (as well as the case where none are listed at
622 // all) it's pointless to look for entries that
623 // haven't yet been migrated to the config file.
627 for _, ks := range svcList.Items {
628 if ks.ServiceType == "proxy" {
629 if len(cluster.Services.Keepproxy.InternalURLs) == 0 {
631 ldr.Logger.Warn("you should migrate your keepproxy configuration to the cluster configuration file")
635 kshost := strings.ToLower(ks.ServiceHost)
636 if localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".") {
637 // it would be confusing to recommend
638 // migrating *this* host's legacy keepstore
639 // config immediately after explaining that
640 // very migration process in more detail.
643 ksurl := keepServiceURL(ks)
644 if _, ok := cluster.Services.Keepstore.InternalURLs[ksurl]; ok {
645 // already added to InternalURLs
648 ldr.Logger.Warnf("you should migrate the legacy keepstore configuration file on host %s", ks.ServiceHost)
651 ldr.Logger.Warn("you should delete all of your manually added keep_services listings using `arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid` -- when those are deleted, the services listed in your cluster configuration will be used instead")
656 // Warn about keepstore servers that have no volumes.
657 func (ldr *Loader) checkEmptyKeepstores(cluster arvados.Cluster) error {
659 for url := range cluster.Services.Keepstore.InternalURLs {
660 for _, vol := range cluster.Volumes {
661 if len(vol.AccessViaHosts) == 0 {
662 // accessible by all servers
665 if _, ok := vol.AccessViaHosts[url]; ok {
669 ldr.Logger.Warnf("keepstore configured at %s does not have access to any volumes", url)
674 // Warn about AccessViaHosts entries that don't correspond to any of
675 // the listed keepstore services.
676 func (ldr *Loader) checkUnlistedKeepstores(cluster arvados.Cluster) error {
677 for uuid, vol := range cluster.Volumes {
678 if uuid == "SAMPLE" {
681 for url := range vol.AccessViaHosts {
682 if _, ok := cluster.Services.Keepstore.InternalURLs[url]; !ok {
683 ldr.Logger.Warnf("Volumes.%s.AccessViaHosts refers to nonexistent keepstore server %s", uuid, url)