1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15 "git.arvados.org/arvados.git/sdk/go/keepclient"
16 "github.com/prometheus/client_golang/prometheus"
19 type PullListItem struct {
20 Locator string `json:"locator"`
21 Servers []string `json:"servers"`
22 MountUUID string `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
28 cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
29 inprogress atomic.Int64
32 func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
35 cond: sync.NewCond(&sync.Mutex{}),
37 reg.MustRegister(prometheus.NewGaugeFunc(
40 Subsystem: "keepstore",
41 Name: "pull_queue_pending_entries",
42 Help: "Number of queued pull requests",
46 defer p.cond.L.Unlock()
47 return float64(len(p.todo))
50 reg.MustRegister(prometheus.NewGaugeFunc(
53 Subsystem: "keepstore",
54 Name: "pull_queue_inprogress_entries",
55 Help: "Number of pull requests in progress",
58 return float64(p.inprogress.Load())
61 if len(p.keepstore.mountsW) == 0 {
62 keepstore.logger.Infof("not running pull worker because there are no writable volumes")
65 for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
71 func (p *puller) SetPullList(newlist []PullListItem) {
78 func (p *puller) runWorker(ctx context.Context) {
79 if len(p.keepstore.mountsW) == 0 {
80 p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
83 c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
85 p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
88 c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
89 ac, err := arvadosclient.New(c)
91 p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
94 keepClient := &keepclient.KeepClient{
97 DiskCacheSize: keepclient.DiskCacheDisabled,
99 // Ensure the loop below wakes up and returns when ctx
100 // cancels, even if pull list is empty.
107 for len(p.todo) == 0 && ctx.Err() == nil {
110 if ctx.Err() != nil {
119 defer p.inprogress.Add(-1)
121 logger := p.keepstore.logger.WithField("locator", item.Locator)
123 li, err := getLocatorInfo(item.Locator)
125 logger.Warn("ignoring pull request for invalid locator")
130 if item.MountUUID != "" {
131 dst = p.keepstore.mounts[item.MountUUID]
133 logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
135 } else if !dst.AllowWrite {
136 logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
140 dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
143 serviceRoots := make(map[string]string)
144 for _, addr := range item.Servers {
145 serviceRoots[addr] = addr
147 keepClient.SetServiceRoots(serviceRoots, nil, nil)
149 signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
151 buf := bytes.NewBuffer(nil)
152 _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
153 Locator: signedLocator,
157 logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
160 err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
162 logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
165 logger.Info("block pulled")