21720: replaced theme.spacing.unit * x with theme.spacing(x) everywhere
[arvados.git] / services / keepstore / pull_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "sync"
11         "sync/atomic"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15         "git.arvados.org/arvados.git/sdk/go/keepclient"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 type PullListItem struct {
20         Locator   string   `json:"locator"`
21         Servers   []string `json:"servers"`
22         MountUUID string   `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
23 }
24
25 type puller struct {
26         keepstore  *keepstore
27         todo       []PullListItem
28         cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
29         inprogress atomic.Int64
30 }
31
32 func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
33         p := &puller{
34                 keepstore: keepstore,
35                 cond:      sync.NewCond(&sync.Mutex{}),
36         }
37         reg.MustRegister(prometheus.NewGaugeFunc(
38                 prometheus.GaugeOpts{
39                         Namespace: "arvados",
40                         Subsystem: "keepstore",
41                         Name:      "pull_queue_pending_entries",
42                         Help:      "Number of queued pull requests",
43                 },
44                 func() float64 {
45                         p.cond.L.Lock()
46                         defer p.cond.L.Unlock()
47                         return float64(len(p.todo))
48                 },
49         ))
50         reg.MustRegister(prometheus.NewGaugeFunc(
51                 prometheus.GaugeOpts{
52                         Namespace: "arvados",
53                         Subsystem: "keepstore",
54                         Name:      "pull_queue_inprogress_entries",
55                         Help:      "Number of pull requests in progress",
56                 },
57                 func() float64 {
58                         return float64(p.inprogress.Load())
59                 },
60         ))
61         if len(p.keepstore.mountsW) == 0 {
62                 keepstore.logger.Infof("not running pull worker because there are no writable volumes")
63                 return p
64         }
65         for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
66                 go p.runWorker(ctx)
67         }
68         return p
69 }
70
71 func (p *puller) SetPullList(newlist []PullListItem) {
72         p.cond.L.Lock()
73         p.todo = newlist
74         p.cond.L.Unlock()
75         p.cond.Broadcast()
76 }
77
78 func (p *puller) runWorker(ctx context.Context) {
79         if len(p.keepstore.mountsW) == 0 {
80                 p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
81                 return
82         }
83         c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
84         if err != nil {
85                 p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
86                 return
87         }
88         c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
89         ac, err := arvadosclient.New(c)
90         if err != nil {
91                 p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
92                 return
93         }
94         keepClient := &keepclient.KeepClient{
95                 Arvados:       ac,
96                 Want_replicas: 1,
97                 DiskCacheSize: keepclient.DiskCacheDisabled,
98         }
99         // Ensure the loop below wakes up and returns when ctx
100         // cancels, even if pull list is empty.
101         go func() {
102                 <-ctx.Done()
103                 p.cond.Broadcast()
104         }()
105         for {
106                 p.cond.L.Lock()
107                 for len(p.todo) == 0 && ctx.Err() == nil {
108                         p.cond.Wait()
109                 }
110                 if ctx.Err() != nil {
111                         return
112                 }
113                 item := p.todo[0]
114                 p.todo = p.todo[1:]
115                 p.inprogress.Add(1)
116                 p.cond.L.Unlock()
117
118                 func() {
119                         defer p.inprogress.Add(-1)
120
121                         logger := p.keepstore.logger.WithField("locator", item.Locator)
122
123                         li, err := getLocatorInfo(item.Locator)
124                         if err != nil {
125                                 logger.Warn("ignoring pull request for invalid locator")
126                                 return
127                         }
128
129                         var dst *mount
130                         if item.MountUUID != "" {
131                                 dst = p.keepstore.mounts[item.MountUUID]
132                                 if dst == nil {
133                                         logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
134                                         return
135                                 } else if !dst.AllowWrite {
136                                         logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
137                                         return
138                                 }
139                         } else {
140                                 dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
141                         }
142
143                         serviceRoots := make(map[string]string)
144                         for _, addr := range item.Servers {
145                                 serviceRoots[addr] = addr
146                         }
147                         keepClient.SetServiceRoots(serviceRoots, nil, nil)
148
149                         signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
150
151                         buf := bytes.NewBuffer(nil)
152                         _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
153                                 Locator: signedLocator,
154                                 WriteTo: buf,
155                         })
156                         if err != nil {
157                                 logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
158                                 return
159                         }
160                         err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
161                         if err != nil {
162                                 logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
163                                 return
164                         }
165                         logger.Info("block pulled")
166                 }()
167         }
168 }