1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/sirupsen/logrus"
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
26 type oldKeepstoreConfig struct {
37 BlobSignatureTTL *arvados.Duration
38 BlobSigningKeyFile *string
39 RequireSignatures *bool
40 SystemAuthTokenFile *string
42 TrashLifetime *arvados.Duration
43 TrashCheckInterval *arvados.Duration
46 EmptyTrashWorkers *int
47 TLSCertificateFile *string
50 Volumes *oldKeepstoreVolumeList
52 ManagementToken *string
54 DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
59 type oldKeepstoreVolume struct {
61 Type string `json:",omitempty"`
63 // Azure driver configs
64 StorageAccountName string `json:",omitempty"`
65 StorageAccountKeyFile string `json:",omitempty"`
66 StorageBaseURL string `json:",omitempty"`
67 ContainerName string `json:",omitempty"`
68 AzureReplication int `json:",omitempty"`
69 RequestTimeout arvados.Duration `json:",omitempty"`
70 ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
71 ListBlobsMaxAttempts int `json:",omitempty"`
74 AccessKeyFile string `json:",omitempty"`
75 SecretKeyFile string `json:",omitempty"`
76 Endpoint string `json:",omitempty"`
77 Region string `json:",omitempty"`
78 Bucket string `json:",omitempty"`
79 LocationConstraint bool `json:",omitempty"`
80 IndexPageSize int `json:",omitempty"`
81 S3Replication int `json:",omitempty"`
82 ConnectTimeout arvados.Duration `json:",omitempty"`
83 ReadTimeout arvados.Duration `json:",omitempty"`
84 RaceWindow arvados.Duration `json:",omitempty"`
85 UnsafeDelete bool `json:",omitempty"`
87 // Directory driver configs
89 DirectoryReplication int
93 ReadOnly bool `json:",omitempty"`
94 StorageClasses []string `json:",omitempty"`
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99 if ldr.KeepstorePath == "" {
102 hostname, err := os.Hostname()
104 return fmt.Errorf("getting hostname: %s", err)
107 var oc oldKeepstoreConfig
108 err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109 if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
111 } else if err != nil {
115 cluster, err := cfg.GetCluster("")
120 myURL := arvados.URL{Scheme: "http"}
121 if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122 myURL.Scheme = "https"
125 if v := oc.Debug; v == nil {
126 } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127 cluster.SystemLogs.LogLevel = "debug"
128 } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129 cluster.SystemLogs.LogLevel = "info"
132 if v := oc.TLSCertificateFile; v != nil {
133 cluster.TLS.Certificate = "file://" + *v
135 if v := oc.TLSKeyFile; v != nil {
136 cluster.TLS.Key = "file://" + *v
138 if v := oc.Listen; v != nil {
139 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
142 } else if len(*v) > 1 && (*v)[0] == ':' {
143 myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144 cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
146 return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
149 for url := range cluster.Services.Keepstore.InternalURLs {
150 if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
155 if myURL.Host == "" {
156 return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
160 if v := oc.LogFormat; v != nil {
161 cluster.SystemLogs.Format = *v
163 if v := oc.MaxBuffers; v != nil {
164 cluster.API.MaxKeepBlobBuffers = *v
166 if v := oc.MaxRequests; v != nil {
167 cluster.API.MaxConcurrentRequests = *v
169 if v := oc.BlobSignatureTTL; v != nil {
170 cluster.Collections.BlobSigningTTL = *v
172 if v := oc.BlobSigningKeyFile; v != nil {
173 buf, err := ioutil.ReadFile(*v)
175 return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
177 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178 cluster.Collections.BlobSigningKey = key
181 if v := oc.RequireSignatures; v != nil {
182 cluster.Collections.BlobSigning = *v
184 if v := oc.SystemAuthTokenFile; v != nil {
185 f, err := os.Open(*v)
187 return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
190 buf, err := ioutil.ReadAll(f)
192 return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
194 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195 cluster.SystemRootToken = key
198 if v := oc.EnableDelete; v != nil {
199 cluster.Collections.BlobTrash = *v
201 if v := oc.TrashLifetime; v != nil {
202 cluster.Collections.BlobTrashLifetime = *v
204 if v := oc.TrashCheckInterval; v != nil {
205 cluster.Collections.BlobTrashCheckInterval = *v
207 if v := oc.TrashWorkers; v != nil {
208 cluster.Collections.BlobTrashConcurrency = *v
210 if v := oc.EmptyTrashWorkers; v != nil {
211 cluster.Collections.BlobDeleteConcurrency = *v
213 if v := oc.PullWorkers; v != nil {
214 cluster.Collections.BlobReplicateConcurrency = *v
216 if oc.Volumes == nil || len(*oc.Volumes) == 0 {
217 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
220 return fmt.Errorf("error discovering local directory volumes: %s", err)
223 err := ldr.migrateOldKeepstoreVolumes(cluster, oc, myURL)
229 cfg.Clusters[cluster.ClusterID] = *cluster
233 // Merge Volumes section of old keepstore config into cluster config.
234 func (ldr *Loader) migrateOldKeepstoreVolumes(cluster *arvados.Cluster, oc oldKeepstoreConfig, myURL arvados.URL) error {
235 for i, oldvol := range *oc.Volumes {
236 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
237 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
239 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
241 for _, va := range accessViaHosts {
246 if writers || len(accessViaHosts) == 0 {
247 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
248 if len(accessViaHosts) > 0 {
249 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
254 var newvol arvados.Volume
256 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
257 newvol = cluster.Volumes[oldUUID]
258 // Remove the old entry. It will be added back
259 // below, possibly with a new UUID.
260 delete(cluster.Volumes, oldUUID)
262 v, err := ldr.translateOldKeepstoreVolume(oldvol)
268 if accessViaHosts == nil {
269 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
271 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
272 newvol.AccessViaHosts = accessViaHosts
276 } else if oc.Listen == nil {
277 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
278 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
279 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
280 } else if len(uuid) != 27 {
281 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
283 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
284 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
285 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
287 volUUID = rendezvousUUID
289 si := cluster.Services.Keepstore.InternalURLs[myURL]
290 si.Rendezvous = uuid[12:]
291 cluster.Services.Keepstore.InternalURLs[myURL] = si
294 volUUID = newUUID(cluster.ClusterID, "nyw5e")
295 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
297 cluster.Volumes[volUUID] = newvol
302 func (ldr *Loader) translateOldKeepstoreVolume(oldvol oldKeepstoreVolume) (arvados.Volume, error) {
303 var newvol arvados.Volume
304 var params interface{}
307 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
308 if err != nil && oldvol.AccessKeyFile != "" {
309 return newvol, fmt.Errorf("error reading AccessKeyFile: %s", err)
311 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
312 if err != nil && oldvol.SecretKeyFile != "" {
313 return newvol, fmt.Errorf("error reading SecretKeyFile: %s", err)
315 newvol = arvados.Volume{
317 ReadOnly: oldvol.ReadOnly,
318 Replication: oldvol.S3Replication,
319 StorageClasses: array2boolmap(oldvol.StorageClasses),
321 params = arvados.S3VolumeDriverParameters{
322 AccessKey: string(bytes.TrimSpace(accesskeydata)),
323 SecretKey: string(bytes.TrimSpace(secretkeydata)),
324 Endpoint: oldvol.Endpoint,
325 Region: oldvol.Region,
326 Bucket: oldvol.Bucket,
327 LocationConstraint: oldvol.LocationConstraint,
328 IndexPageSize: oldvol.IndexPageSize,
329 ConnectTimeout: oldvol.ConnectTimeout,
330 ReadTimeout: oldvol.ReadTimeout,
331 RaceWindow: oldvol.RaceWindow,
332 UnsafeDelete: oldvol.UnsafeDelete,
335 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
336 if err != nil && oldvol.StorageAccountKeyFile != "" {
337 return newvol, fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
339 newvol = arvados.Volume{
341 ReadOnly: oldvol.ReadOnly,
342 Replication: oldvol.AzureReplication,
343 StorageClasses: array2boolmap(oldvol.StorageClasses),
345 params = arvados.AzureVolumeDriverParameters{
346 StorageAccountName: oldvol.StorageAccountName,
347 StorageAccountKey: string(bytes.TrimSpace(keydata)),
348 StorageBaseURL: oldvol.StorageBaseURL,
349 ContainerName: oldvol.ContainerName,
350 RequestTimeout: oldvol.RequestTimeout,
351 ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
352 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
355 newvol = arvados.Volume{
357 ReadOnly: oldvol.ReadOnly,
358 Replication: oldvol.DirectoryReplication,
359 StorageClasses: array2boolmap(oldvol.StorageClasses),
361 params = arvados.DirectoryVolumeDriverParameters{
363 Serialize: oldvol.Serialize,
366 return newvol, fmt.Errorf("unsupported volume type %q", oldvol.Type)
368 dp, err := json.Marshal(params)
372 newvol.DriverParameters = json.RawMessage(dp)
373 if newvol.Replication < 1 {
374 newvol.Replication = 1
379 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
380 for uuid, newvol := range newvols {
381 if oldvol.Type != newvol.Driver {
386 var params arvados.S3VolumeDriverParameters
387 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
388 oldvol.Endpoint == params.Endpoint &&
389 oldvol.Region == params.Region &&
390 oldvol.Bucket == params.Bucket &&
391 oldvol.LocationConstraint == params.LocationConstraint {
395 var params arvados.AzureVolumeDriverParameters
396 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
397 oldvol.StorageAccountName == params.StorageAccountName &&
398 oldvol.StorageBaseURL == params.StorageBaseURL &&
399 oldvol.ContainerName == params.ContainerName {
403 var params arvados.DirectoryVolumeDriverParameters
404 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
405 oldvol.Root == params.Root {
406 if _, ok := newvol.AccessViaHosts[myURL]; ok {
415 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
416 if mountsFile == "" {
417 mountsFile = "/proc/mounts"
419 f, err := os.Open(mountsFile)
421 return fmt.Errorf("error opening %s: %s", mountsFile, err)
424 scanner := bufio.NewScanner(f)
426 args := strings.Fields(scanner.Text())
427 dev, mount := args[0], args[1]
431 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
434 keepdir := mount + "/keep"
435 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
440 for _, fsopt := range strings.Split(args[3], ",") {
446 uuid := newUUID(cluster.ClusterID, "nyw5e")
447 ldr.Logger.WithFields(logrus.Fields{
449 "Driver": "Directory",
450 "DriverParameters.Root": keepdir,
451 "DriverParameters.Serialize": false,
454 }).Warn("adding local directory volume")
456 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
463 cluster.Volumes[uuid] = arvados.Volume{
468 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
469 myURL: {ReadOnly: ro},
473 if err := scanner.Err(); err != nil {
474 return fmt.Errorf("reading %s: %s", mountsFile, err)
479 func array2boolmap(keys []string) map[string]bool {
480 m := map[string]bool{}
481 for _, k := range keys {
487 func newUUID(clusterID, infix string) string {
488 randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
492 randstr := randint.Text(36)
493 for len(randstr) < 15 {
494 randstr = "0" + randstr
496 return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
499 // Return the UUID and URL for the controller's keep_services listing
500 // corresponding to this host/process.
501 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
502 client, err := arvados.NewClientFromConfig(cluster)
506 client.AuthToken = cluster.SystemRootToken
507 var svcList arvados.KeepServiceList
508 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
512 hostname, err := os.Hostname()
514 err = fmt.Errorf("error getting hostname: %s", err)
518 for _, ks := range svcList.Items {
519 if ks.ServiceType == "proxy" {
521 } else if keepServiceIsMe(ks, hostname, listen) {
524 Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
526 if ks.ServiceSSLFlag {
529 return ks.UUID, url, nil
531 tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
534 err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
538 var localhostOrAllInterfaces = map[string]bool{
546 // Return true if the given KeepService entry matches the given
547 // hostname and (keepstore config file) listen address.
549 // If the KeepService host is some variant of "localhost", we assume
550 // this is a testing or single-node environment, ignore the given
551 // hostname, and return true if the port numbers match.
553 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
554 // match a KeepService host "foo.bar", but also "foo.bar.example",
555 // "foo.bar.example.org", etc.
556 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
557 // Extract the port name/number from listen, and resolve it to
558 // a port number to compare with ks.ServicePort.
559 _, listenport, err := net.SplitHostPort(listen)
560 if err != nil && strings.HasPrefix(listen, ":") {
561 listenport = listen[1:]
563 if lp, err := net.LookupPort("tcp", listenport); err != nil {
565 } else if !(lp == ks.ServicePort ||
566 (lp == 0 && ks.ServicePort == 80)) {
570 kshost := strings.ToLower(ks.ServiceHost)
571 return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")