1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/sirupsen/logrus"
24 const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
26 type oldKeepstoreConfig struct {
37 BlobSignatureTTL *arvados.Duration
38 BlobSigningKeyFile *string
39 RequireSignatures *bool
40 SystemAuthTokenFile *string
42 TrashLifetime *arvados.Duration
43 TrashCheckInterval *arvados.Duration
46 EmptyTrashWorkers *int
47 TLSCertificateFile *string
50 Volumes *oldKeepstoreVolumeList
52 ManagementToken *string
54 DiscoverVolumesFromMountsFile string // not a real legacy config -- just useful for tests
57 type oldKeepstoreVolumeList []oldKeepstoreVolume
59 type oldKeepstoreVolume struct {
61 Type string `json:",omitempty"`
63 // Azure driver configs
64 StorageAccountName string `json:",omitempty"`
65 StorageAccountKeyFile string `json:",omitempty"`
66 StorageBaseURL string `json:",omitempty"`
67 ContainerName string `json:",omitempty"`
68 AzureReplication int `json:",omitempty"`
69 RequestTimeout arvados.Duration `json:",omitempty"`
70 ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
71 ListBlobsMaxAttempts int `json:",omitempty"`
74 AccessKeyFile string `json:",omitempty"`
75 SecretKeyFile string `json:",omitempty"`
76 Endpoint string `json:",omitempty"`
77 Region string `json:",omitempty"`
78 Bucket string `json:",omitempty"`
79 LocationConstraint bool `json:",omitempty"`
80 IndexPageSize int `json:",omitempty"`
81 S3Replication int `json:",omitempty"`
82 ConnectTimeout arvados.Duration `json:",omitempty"`
83 ReadTimeout arvados.Duration `json:",omitempty"`
84 RaceWindow arvados.Duration `json:",omitempty"`
85 UnsafeDelete bool `json:",omitempty"`
87 // Directory driver configs
89 DirectoryReplication int
93 ReadOnly bool `json:",omitempty"`
94 StorageClasses []string `json:",omitempty"`
97 // update config using values from an old-style keepstore config file.
98 func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
99 if ldr.KeepstorePath == "" {
102 hostname, err := os.Hostname()
104 return fmt.Errorf("getting hostname: %s", err)
107 var oc oldKeepstoreConfig
108 err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
109 if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
111 } else if err != nil {
115 cluster, err := cfg.GetCluster("")
120 myURL := arvados.URL{Scheme: "http"}
121 if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
122 myURL.Scheme = "https"
125 if v := oc.Debug; v == nil {
126 } else if *v && cluster.SystemLogs.LogLevel != "debug" {
127 cluster.SystemLogs.LogLevel = "debug"
128 } else if !*v && cluster.SystemLogs.LogLevel != "info" {
129 cluster.SystemLogs.LogLevel = "info"
132 if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
133 cluster.TLS.Certificate = "file://" + *v
135 if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
136 cluster.TLS.Key = "file://" + *v
138 if v := oc.Listen; v != nil {
139 if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
142 } else if len(*v) > 1 && (*v)[0] == ':' {
143 myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
144 cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
146 return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
149 for url := range cluster.Services.Keepstore.InternalURLs {
150 if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
155 if myURL.Host == "" {
156 return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
160 if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
161 cluster.SystemLogs.Format = *v
163 if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
164 cluster.API.MaxKeepBlockBuffers = *v
166 if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
167 cluster.API.MaxConcurrentRequests = *v
169 if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
170 cluster.Collections.BlobSigningTTL = *v
172 if v := oc.BlobSigningKeyFile; v != nil {
173 buf, err := ioutil.ReadFile(*v)
175 return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
177 if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
178 cluster.Collections.BlobSigningKey = key
181 if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
182 cluster.Collections.BlobSigning = *v
184 if v := oc.SystemAuthTokenFile; v != nil {
185 f, err := os.Open(*v)
187 return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
190 buf, err := ioutil.ReadAll(f)
192 return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
194 if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
195 cluster.SystemRootToken = key
198 if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
199 cluster.Collections.BlobTrash = *v
201 if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
202 cluster.Collections.BlobTrashLifetime = *v
204 if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
205 cluster.Collections.BlobTrashCheckInterval = *v
207 if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
208 cluster.Collections.BlobTrashConcurrency = *v
210 if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
211 cluster.Collections.BlobDeleteConcurrency = *v
213 if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
214 cluster.Collections.BlobReplicateConcurrency = *v
216 if v := oc.Volumes; v == nil {
217 ldr.Logger.Warn("no volumes in legacy config; discovering local directory volumes")
218 err := ldr.discoverLocalVolumes(cluster, oc.DiscoverVolumesFromMountsFile, myURL)
220 return fmt.Errorf("error discovering local directory volumes: %s", err)
223 for i, oldvol := range *v {
224 var accessViaHosts map[arvados.URL]arvados.VolumeAccess
225 oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
227 accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
229 for _, va := range accessViaHosts {
234 if writers || len(accessViaHosts) == 0 {
235 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
236 if len(accessViaHosts) > 0 {
237 cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
242 var newvol arvados.Volume
244 ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
245 newvol = cluster.Volumes[oldUUID]
246 // Remove the old entry. It will be
247 // added back below, possibly with a
249 delete(cluster.Volumes, oldUUID)
251 var params interface{}
254 accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
255 if err != nil && oldvol.AccessKeyFile != "" {
256 return fmt.Errorf("error reading AccessKeyFile: %s", err)
258 secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
259 if err != nil && oldvol.SecretKeyFile != "" {
260 return fmt.Errorf("error reading SecretKeyFile: %s", err)
262 newvol = arvados.Volume{
264 ReadOnly: oldvol.ReadOnly,
265 Replication: oldvol.S3Replication,
266 StorageClasses: array2boolmap(oldvol.StorageClasses),
268 params = arvados.S3VolumeDriverParameters{
269 AccessKey: string(bytes.TrimSpace(accesskeydata)),
270 SecretKey: string(bytes.TrimSpace(secretkeydata)),
271 Endpoint: oldvol.Endpoint,
272 Region: oldvol.Region,
273 Bucket: oldvol.Bucket,
274 LocationConstraint: oldvol.LocationConstraint,
275 IndexPageSize: oldvol.IndexPageSize,
276 ConnectTimeout: oldvol.ConnectTimeout,
277 ReadTimeout: oldvol.ReadTimeout,
278 RaceWindow: oldvol.RaceWindow,
279 UnsafeDelete: oldvol.UnsafeDelete,
282 keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
283 if err != nil && oldvol.StorageAccountKeyFile != "" {
284 return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
286 newvol = arvados.Volume{
288 ReadOnly: oldvol.ReadOnly,
289 Replication: oldvol.AzureReplication,
290 StorageClasses: array2boolmap(oldvol.StorageClasses),
292 params = arvados.AzureVolumeDriverParameters{
293 StorageAccountName: oldvol.StorageAccountName,
294 StorageAccountKey: string(bytes.TrimSpace(keydata)),
295 StorageBaseURL: oldvol.StorageBaseURL,
296 ContainerName: oldvol.ContainerName,
297 RequestTimeout: oldvol.RequestTimeout,
298 ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
299 ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
302 newvol = arvados.Volume{
304 ReadOnly: oldvol.ReadOnly,
305 Replication: oldvol.DirectoryReplication,
306 StorageClasses: array2boolmap(oldvol.StorageClasses),
308 params = arvados.DirectoryVolumeDriverParameters{
310 Serialize: oldvol.Serialize,
313 return fmt.Errorf("unsupported volume type %q", oldvol.Type)
315 dp, err := json.Marshal(params)
319 newvol.DriverParameters = json.RawMessage(dp)
320 if newvol.Replication < 1 {
321 newvol.Replication = 1
324 if accessViaHosts == nil {
325 accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
327 accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
328 newvol.AccessViaHosts = accessViaHosts
332 } else if oc.Listen == nil {
333 ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
334 } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
335 ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
336 } else if len(uuid) != 27 {
337 ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
339 rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
340 if _, ok := cluster.Volumes[rendezvousUUID]; ok {
341 ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
343 volUUID = rendezvousUUID
347 volUUID = newUUID(cluster.ClusterID, "nyw5e")
348 ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
350 cluster.Volumes[volUUID] = newvol
354 cfg.Clusters[cluster.ClusterID] = *cluster
358 func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
359 for uuid, newvol := range newvols {
360 if oldvol.Type != newvol.Driver {
365 var params arvados.S3VolumeDriverParameters
366 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
367 oldvol.Endpoint == params.Endpoint &&
368 oldvol.Region == params.Region &&
369 oldvol.Bucket == params.Bucket &&
370 oldvol.LocationConstraint == params.LocationConstraint {
374 var params arvados.AzureVolumeDriverParameters
375 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
376 oldvol.StorageAccountName == params.StorageAccountName &&
377 oldvol.StorageBaseURL == params.StorageBaseURL &&
378 oldvol.ContainerName == params.ContainerName {
382 var params arvados.DirectoryVolumeDriverParameters
383 if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
384 oldvol.Root == params.Root {
385 if _, ok := newvol.AccessViaHosts[myURL]; ok {
394 func (ldr *Loader) discoverLocalVolumes(cluster *arvados.Cluster, mountsFile string, myURL arvados.URL) error {
395 if mountsFile == "" {
396 mountsFile = "/proc/mounts"
398 f, err := os.Open(mountsFile)
400 return fmt.Errorf("error opening %s: %s", mountsFile, err)
403 scanner := bufio.NewScanner(f)
405 args := strings.Fields(scanner.Text())
406 dev, mount := args[0], args[1]
410 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
413 keepdir := mount + "/keep"
414 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
419 for _, fsopt := range strings.Split(args[3], ",") {
425 uuid := newUUID(cluster.ClusterID, "nyw5e")
426 ldr.Logger.WithFields(logrus.Fields{
428 "Driver": "Directory",
429 "DriverParameters.Root": keepdir,
430 "DriverParameters.Serialize": false,
433 }).Warn("adding local directory volume")
435 p, err := json.Marshal(arvados.DirectoryVolumeDriverParameters{
442 cluster.Volumes[uuid] = arvados.Volume{
447 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
448 myURL: {ReadOnly: ro},
452 if err := scanner.Err(); err != nil {
453 return fmt.Errorf("reading %s: %s", mountsFile, err)
458 func array2boolmap(keys []string) map[string]bool {
459 m := map[string]bool{}
460 for _, k := range keys {
466 func newUUID(clusterID, infix string) string {
467 randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
471 randstr := randint.Text(36)
472 for len(randstr) < 15 {
473 randstr = "0" + randstr
475 return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
478 // Return the UUID and URL for the controller's keep_services listing
479 // corresponding to this host/process.
480 func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
481 client, err := arvados.NewClientFromConfig(cluster)
485 client.AuthToken = cluster.SystemRootToken
486 var svcList arvados.KeepServiceList
487 err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
491 hostname, err := os.Hostname()
493 err = fmt.Errorf("error getting hostname: %s", err)
497 for _, ks := range svcList.Items {
498 if ks.ServiceType == "proxy" {
500 } else if keepServiceIsMe(ks, hostname, listen) {
503 Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
505 if ks.ServiceSSLFlag {
508 return ks.UUID, url, nil
510 tried = append(tried, fmt.Sprintf("%s:%d", ks.ServiceHost, ks.ServicePort))
513 err = fmt.Errorf("listen address %q does not match any of the non-proxy keep_services entries %q", listen, tried)
517 var localhostOrAllInterfaces = map[string]bool{
525 // Return true if the given KeepService entry matches the given
526 // hostname and (keepstore config file) listen address.
528 // If the KeepService host is some variant of "localhost", we assume
529 // this is a testing or single-node environment, ignore the given
530 // hostname, and return true if the port numbers match.
532 // The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
533 // match a KeepService host "foo.bar", but also "foo.bar.example",
534 // "foo.bar.example.org", etc.
535 func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
536 // Extract the port name/number from listen, and resolve it to
537 // a port number to compare with ks.ServicePort.
538 _, listenport, err := net.SplitHostPort(listen)
539 if err != nil && strings.HasPrefix(listen, ":") {
540 listenport = listen[1:]
542 if lp, err := net.LookupPort("tcp", listenport); err != nil {
544 } else if !(lp == ks.ServicePort ||
545 (lp == 0 && ks.ServicePort == 80)) {
549 kshost := strings.ToLower(ks.ServiceHost)
550 return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")