1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/sirupsen/logrus"
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
26 type oldKeepstoreConfig struct {
37 BlobSignatureTTL *arvados.Duration
38 BlobSigningKeyFile *string
39 RequireSignatures *bool
40 SystemAuthTokenFile *string
42 TrashLifetime *arvados.Duration
43 TrashCheckInterval *arvados.Duration
46 EmptyTrashWorkers *int
47 TLSCertificateFile *string
50 Volumes *oldKeepstoreVolumeList
52 ManagementToken *string
54 DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
59 type oldKeepstoreVolume struct {
61 Type string `json:",omitempty"`
63 // Azure driver configs
64 StorageAccountName string `json:",omitempty"`
65 StorageAccountKeyFile string `json:",omitempty"`
66 StorageBaseURL string `json:",omitempty"`
67 ContainerName string `json:",omitempty"`
68 AzureReplication int `json:",omitempty"`
69 RequestTimeout arvados.Duration `json:",omitempty"`
70 ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
71 ListBlobsMaxAttempts int `json:",omitempty"`
74 AccessKeyFile string `json:",omitempty"`
75 SecretKeyFile string `json:",omitempty"`
76 Endpoint string `json:",omitempty"`
77 Region string `json:",omitempty"`
78 Bucket string `json:",omitempty"`
79 LocationConstraint bool `json:",omitempty"`
80 IndexPageSize int `json:",omitempty"`
81 S3Replication int `json:",omitempty"`
82 ConnectTimeout arvados.Duration `json:",omitempty"`
83 ReadTimeout arvados.Duration `json:",omitempty"`
84 RaceWindow arvados.Duration `json:",omitempty"`
85 UnsafeDelete bool `json:",omitempty"`
87 // Directory driver configs
89 DirectoryReplication int
93 ReadOnly bool `json:",omitempty"`
94 StorageClasses []string `json:",omitempty"`
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99 if ldr.KeepstorePath == "" {
102 hostname, err := os.Hostname()
104 return fmt.Errorf("getting hostname: %s", err)
107 var oc oldKeepstoreConfig
108 err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109 if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
111 } else if err != nil {
115 cluster, err := cfg.GetCluster("")
120 myURL := arvados.URL{Scheme: "http"}
121 if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122 myURL.Scheme = "https"
125 if v := oc.Debug; v == nil {
126 } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127 cluster.SystemLogs.LogLevel = "debug"
128 } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129 cluster.SystemLogs.LogLevel = "info"
132 if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
133 cluster.TLS.Certificate = "file://" + *v
135 if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
136 cluster.TLS.Key = "file://" + *v
138 if v := oc.Listen; v != nil {
139 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
142 } else if len(*v) > 1 && (*v)[0] == ':' {
143 myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144 cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
146 return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
149 for url := range cluster.Services.Keepstore.InternalURLs {
150 if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
155 if myURL.Host == "" {
156 return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
160 if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
161 cluster.SystemLogs.Format = *v
163 if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
164 cluster.API.MaxKeepBlockBuffers = *v
166 if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
167 cluster.API.MaxConcurrentRequests = *v
169 if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
170 cluster.Collections.BlobSigningTTL = *v
172 if v := oc.BlobSigningKeyFile; v != nil {
173 buf, err := ioutil.ReadFile(*v)
175 return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
177 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178 cluster.Collections.BlobSigningKey = key
181 if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
182 cluster.Collections.BlobSigning = *v
184 if v := oc.SystemAuthTokenFile; v != nil {
185 f, err := os.Open(*v)
187 return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
190 buf, err := ioutil.ReadAll(f)
192 return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
194 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195 cluster.SystemRootToken = key
198 if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
199 cluster.Collections.BlobTrash = *v
201 if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
202 cluster.Collections.BlobTrashLifetime = *v
204 if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
205 cluster.Collections.BlobTrashCheckInterval = *v
207 if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
208 cluster.Collections.BlobTrashConcurrency = *v
210 if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
211 cluster.Collections.BlobDeleteConcurrency = *v
213 if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
214 cluster.Collections.BlobReplicateConcurrency = *v
216 if v := oc.Volumes; v == nil {
217 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
220 return fmt.Errorf("error discovering local directory volumes: %s", err)
223 for i, oldvol := range *v {
224 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
225 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
227 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
229 for _, va := range accessViaHosts {
234 if writers || len(accessViaHosts) == 0 {
235 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
236 if len(accessViaHosts) > 0 {
237 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
242 var newvol arvados.Volume
244 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
245 newvol = cluster.Volumes[oldUUID]
246 // Remove the old entry. It will be
247 // added back below, possibly with a
249 delete(cluster.Volumes, oldUUID)
251 var params interface{}
254 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
255 if err != nil && oldvol.AccessKeyFile != "" {
256 return fmt.Errorf("error reading AccessKeyFile: %s", err)
258 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
259 if err != nil && oldvol.SecretKeyFile != "" {
260 return fmt.Errorf("error reading SecretKeyFile: %s", err)
262 newvol = arvados.Volume{
264 ReadOnly: oldvol.ReadOnly,
265 Replication: oldvol.S3Replication,
266 StorageClasses: array2boolmap(oldvol.StorageClasses),
268 params = arvados.S3VolumeDriverParameters{
269 AccessKey: string(bytes.TrimSpace(accesskeydata)),
270 SecretKey: string(bytes.TrimSpace(secretkeydata)),
271 Endpoint: oldvol.Endpoint,
272 Region: oldvol.Region,
273 Bucket: oldvol.Bucket,
274 LocationConstraint: oldvol.LocationConstraint,
275 IndexPageSize: oldvol.IndexPageSize,
276 ConnectTimeout: oldvol.ConnectTimeout,
277 ReadTimeout: oldvol.ReadTimeout,
278 RaceWindow: oldvol.RaceWindow,
279 UnsafeDelete: oldvol.UnsafeDelete,
282 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
283 if err != nil && oldvol.StorageAccountKeyFile != "" {
284 return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
286 newvol = arvados.Volume{
288 ReadOnly: oldvol.ReadOnly,
289 Replication: oldvol.AzureReplication,
290 StorageClasses: array2boolmap(oldvol.StorageClasses),
292 params = arvados.AzureVolumeDriverParameters{
293 StorageAccountName: oldvol.StorageAccountName,
294 StorageAccountKey: string(bytes.TrimSpace(keydata)),
295 StorageBaseURL: oldvol.StorageBaseURL,
296 ContainerName: oldvol.ContainerName,
297 RequestTimeout: oldvol.RequestTimeout,
298 ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
299 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
302 newvol = arvados.Volume{
304 ReadOnly: oldvol.ReadOnly,
305 Replication: oldvol.DirectoryReplication,
306 StorageClasses: array2boolmap(oldvol.StorageClasses),
308 params = arvados.DirectoryVolumeDriverParameters{
310 Serialize: oldvol.Serialize,
313 return fmt.Errorf("unsupported volume type %q", oldvol.Type)
315 dp, err := json.Marshal(params)
319 newvol.DriverParameters = json.RawMessage(dp)
320 if newvol.Replication < 1 {
321 newvol.Replication = 1
324 if accessViaHosts == nil {
325 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
327 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
328 newvol.AccessViaHosts = accessViaHosts
332 } else if oc.Listen == nil {
333 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
334 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
335 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
336 } else if len(uuid) != 27 {
337 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
339 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
340 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
341 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
343 volUUID = rendezvousUUID
345 si := cluster.Services.Keepstore.InternalURLs[myURL]
346 si.Rendezvous = uuid[12:]
347 cluster.Services.Keepstore.InternalURLs[myURL] = si
350 volUUID = newUUID(cluster.ClusterID, "nyw5e")
351 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
353 cluster.Volumes[volUUID] = newvol
357 cfg.Clusters[cluster.ClusterID] = *cluster
361 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
362 for uuid, newvol := range newvols {
363 if oldvol.Type != newvol.Driver {
368 var params arvados.S3VolumeDriverParameters
369 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
370 oldvol.Endpoint == params.Endpoint &&
371 oldvol.Region == params.Region &&
372 oldvol.Bucket == params.Bucket &&
373 oldvol.LocationConstraint == params.LocationConstraint {
377 var params arvados.AzureVolumeDriverParameters
378 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
379 oldvol.StorageAccountName == params.StorageAccountName &&
380 oldvol.StorageBaseURL == params.StorageBaseURL &&
381 oldvol.ContainerName == params.ContainerName {
385 var params arvados.DirectoryVolumeDriverParameters
386 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
387 oldvol.Root == params.Root {
388 if _, ok := newvol.AccessViaHosts[myURL]; ok {
397 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
398 if mountsFile == "" {
399 mountsFile = "/proc/mounts"
401 f, err := os.Open(mountsFile)
403 return fmt.Errorf("error opening %s: %s", mountsFile, err)
406 scanner := bufio.NewScanner(f)
408 args := strings.Fields(scanner.Text())
409 dev, mount := args[0], args[1]
413 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
416 keepdir := mount + "/keep"
417 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
422 for _, fsopt := range strings.Split(args[3], ",") {
428 uuid := newUUID(cluster.ClusterID, "nyw5e")
429 ldr.Logger.WithFields(logrus.Fields{
431 "Driver": "Directory",
432 "DriverParameters.Root": keepdir,
433 "DriverParameters.Serialize": false,
436 }).Warn("adding local directory volume")
438 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
445 cluster.Volumes[uuid] = arvados.Volume{
450 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
451 myURL: {ReadOnly: ro},
455 if err := scanner.Err(); err != nil {
456 return fmt.Errorf("reading %s: %s", mountsFile, err)
461 func array2boolmap(keys []string) map[string]bool {
462 m := map[string]bool{}
463 for _, k := range keys {
469 func newUUID(clusterID, infix string) string {
470 randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
474 randstr := randint.Text(36)
475 for len(randstr) < 15 {
476 randstr = "0" + randstr
478 return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
481 // Return the UUID and URL for the controller's keep_services listing
482 // corresponding to this host/process.
483 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
484 client, err := arvados.NewClientFromConfig(cluster)
488 client.AuthToken = cluster.SystemRootToken
489 var svcList arvados.KeepServiceList
490 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
494 hostname, err := os.Hostname()
496 err = fmt.Errorf("error getting hostname: %s", err)
500 for _, ks := range svcList.Items {
501 if ks.ServiceType == "proxy" {
503 } else if keepServiceIsMe(ks, hostname, listen) {
506 Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
508 if ks.ServiceSSLFlag {
511 return ks.UUID, url, nil
513 tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
516 err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
520 var localhostOrAllInterfaces = map[string]bool{
528 // Return true if the given KeepService entry matches the given
529 // hostname and (keepstore config file) listen address.
531 // If the KeepService host is some variant of "localhost", we assume
532 // this is a testing or single-node environment, ignore the given
533 // hostname, and return true if the port numbers match.
535 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
536 // match a KeepService host "foo.bar", but also "foo.bar.example",
537 // "foo.bar.example.org", etc.
538 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
539 // Extract the port name/number from listen, and resolve it to
540 // a port number to compare with ks.ServicePort.
541 _, listenport, err := net.SplitHostPort(listen)
542 if err != nil && strings.HasPrefix(listen, ":") {
543 listenport = listen[1:]
545 if lp, err := net.LookupPort("tcp", listenport); err != nil {
547 } else if !(lp == ks.ServicePort ||
548 (lp == 0 && ks.ServicePort == 80)) {
552 kshost := strings.ToLower(ks.ServiceHost)
553 return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")