2960: Rename parseLocator to getLocatorInfo, add comments.
[arvados.git] / services / keepstore / pull_worker.go
index 42b5d5889d30984685d43655f66fb3857f98de92..dc5eabaa15bbc0b4c5e94add8b5bc461aad998ed 100644 (file)
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
+       "bytes"
        "context"
-       "crypto/rand"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "time"
+       "sync"
+       "sync/atomic"
 
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
-// RunPullWorker receives PullRequests from pullq, invokes
-// PullItemAndProcess on each one. After each PR, it logs a message
-// indicating whether the pull was successful.
-func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
-       for item := range pullq.NextItem {
-               pr := item.(PullRequest)
-               err := PullItemAndProcess(pr, keepClient)
-               pullq.DoneItem <- struct{}{}
-               if err == nil {
-                       log.Printf("Pull %s success", pr)
-               } else {
-                       log.Printf("Pull %s error: %s", pr, err)
-               }
-       }
+type PullListItem struct {
+       Locator   string   `json:"locator"`
+       Servers   []string `json:"servers"`
+       MountUUID string   `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
 }
 
-// PullItemAndProcess executes a pull request by retrieving the
-// specified block from one of the specified servers, and storing it
-// on a local volume.
-//
-// If the PR specifies a non-blank mount UUID, PullItemAndProcess will
-// only attempt to write the data to the corresponding
-// volume. Otherwise it writes to any local volume, as a PUT request
-// would.
-func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClient) error {
-       var vol Volume
-       if uuid := pullRequest.MountUUID; uuid != "" {
-               vol = KeepVM.Lookup(pullRequest.MountUUID, true)
-               if vol == nil {
-                       return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
-               }
-       }
-
-       keepClient.Arvados.ApiToken = randomToken
+type puller struct {
+       keepstore  *keepstore
+       todo       []PullListItem
+       cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
+       inprogress atomic.Int64
+}
 
-       serviceRoots := make(map[string]string)
-       for _, addr := range pullRequest.Servers {
-               serviceRoots[addr] = addr
+func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
+       p := &puller{
+               keepstore: keepstore,
+               cond:      sync.NewCond(&sync.Mutex{}),
        }
-       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_pending_entries",
+                       Help:      "Number of queued pull requests",
+               },
+               func() float64 {
+                       p.cond.L.Lock()
+                       defer p.cond.L.Unlock()
+                       return float64(len(p.todo))
+               },
+       ))
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_inprogress_entries",
+                       Help:      "Number of pull requests in progress",
+               },
+               func() float64 {
+                       return float64(p.inprogress.Load())
+               },
+       ))
+       if len(p.keepstore.mountsW) == 0 {
+               keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+               return p
+       }
+       for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
+               go p.runWorker(ctx)
+       }
+       return p
+}
 
-       // Generate signature with a random token
-       expiresAt := time.Now().Add(60 * time.Second)
-       signedLocator := SignLocator(pullRequest.Locator, randomToken, expiresAt)
+func (p *puller) SetPullList(newlist []PullListItem) {
+       p.cond.L.Lock()
+       p.todo = newlist
+       p.cond.L.Unlock()
+       p.cond.Broadcast()
+}
 
-       reader, contentLen, _, err := GetContent(signedLocator, keepClient)
-       if err != nil {
-               return err
+func (p *puller) runWorker(ctx context.Context) {
+       if len(p.keepstore.mountsW) == 0 {
+               p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+               return
        }
-       if reader == nil {
-               return fmt.Errorf("No reader found for : %s", signedLocator)
+       c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
+       if err != nil {
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-       defer reader.Close()
-
-       readContent, err := ioutil.ReadAll(reader)
+       c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
+       ac, err := arvadosclient.New(c)
        if err != nil {
-               return err
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-
-       if (readContent == nil) || (int64(len(readContent)) != contentLen) {
-               return fmt.Errorf("Content not found for: %s", signedLocator)
+       keepClient := &keepclient.KeepClient{
+               Arvados:       ac,
+               Want_replicas: 1,
+               DiskCacheSize: keepclient.DiskCacheDisabled,
        }
+       // Ensure the loop below wakes up and returns when ctx
+       // cancels, even if pull list is empty.
+       go func() {
+               <-ctx.Done()
+               p.cond.Broadcast()
+       }()
+       for {
+               p.cond.L.Lock()
+               for len(p.todo) == 0 && ctx.Err() == nil {
+                       p.cond.Wait()
+               }
+               if ctx.Err() != nil {
+                       return
+               }
+               item := p.todo[0]
+               p.todo = p.todo[1:]
+               p.inprogress.Add(1)
+               p.cond.L.Unlock()
 
-       writePulledBlock(vol, readContent, pullRequest.Locator)
-       return nil
-}
+               func() {
+                       defer p.inprogress.Add(-1)
 
-// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
-       return keepClient.Get(signedLocator)
-}
+                       logger := p.keepstore.logger.WithField("locator", item.Locator)
 
-var writePulledBlock = func(volume Volume, data []byte, locator string) {
-       var err error
-       if volume != nil {
-               err = volume.Put(context.Background(), locator, data)
-       } else {
-               _, err = PutBlock(context.Background(), data, locator)
-       }
-       if err != nil {
-               log.Printf("error writing pulled block %q: %s", locator, err)
-       }
-}
+                       li, err := getLocatorInfo(item.Locator)
+                       if err != nil {
+                               logger.Warn("ignoring pull request for invalid locator")
+                               return
+                       }
+
+                       var dst *mount
+                       if item.MountUUID != "" {
+                               dst = p.keepstore.mounts[item.MountUUID]
+                               if dst == nil {
+                                       logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
+                                       return
+                               } else if !dst.AllowWrite {
+                                       logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
+                                       return
+                               }
+                       } else {
+                               dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
+                       }
 
-var randomToken = func() string {
-       const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
-       var bytes = make([]byte, 36)
-       rand.Read(bytes)
-       for i, b := range bytes {
-               bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
+                       serviceRoots := make(map[string]string)
+                       for _, addr := range item.Servers {
+                               serviceRoots[addr] = addr
+                       }
+                       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+
+                       signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
+
+                       buf := bytes.NewBuffer(nil)
+                       _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
+                               Locator: signedLocator,
+                               WriteTo: buf,
+                       })
+                       if err != nil {
+                               logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
+                               return
+                       }
+                       err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
+                       if err != nil {
+                               logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
+                               return
+                       }
+                       logger.Info("block pulled")
+               }()
        }
-       return (string(bytes))
-}()
+}