2960: Rename parseLocator to getLocatorInfo, add comments.
[arvados.git] / services / keepstore / pull_worker.go
index 3c6278d478d3d897982b2b6f8c9a166c505e6433..dc5eabaa15bbc0b4c5e94add8b5bc461aad998ed 100644 (file)
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 import (
+       "bytes"
        "context"
-       "crypto/rand"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "io"
-       "io/ioutil"
-       "time"
+       "sync"
+       "sync/atomic"
 
-       log "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
-// RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
-//     The channel will process pull list.
-//             For each (next) pull request:
-//                     For each locator listed, execute Pull on the server(s) listed
-//                     Skip the rest of the servers if no errors
-//             Repeat
-//
-func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
-       nextItem := pullq.NextItem
-       for item := range nextItem {
-               pullRequest := item.(PullRequest)
-               err := PullItemAndProcess(item.(PullRequest), GenerateRandomAPIToken(), keepClient)
-               pullq.DoneItem <- struct{}{}
-               if err == nil {
-                       log.Printf("Pull %s success", pullRequest)
-               } else {
-                       log.Printf("Pull %s error: %s", pullRequest, err)
-               }
-       }
+type PullListItem struct {
+       Locator   string   `json:"locator"`
+       Servers   []string `json:"servers"`
+       MountUUID string   `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
 }
 
-// PullItemAndProcess pulls items from PullQueue and processes them.
-//     For each Pull request:
-//             Generate a random API token.
-//             Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
-//             Using this token & signature, retrieve the given block.
-//             Write to storage
-//
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
-       keepClient.Arvados.ApiToken = token
+type puller struct {
+       keepstore  *keepstore
+       todo       []PullListItem
+       cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
+       inprogress atomic.Int64
+}
 
-       serviceRoots := make(map[string]string)
-       for _, addr := range pullRequest.Servers {
-               serviceRoots[addr] = addr
+func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
+       p := &puller{
+               keepstore: keepstore,
+               cond:      sync.NewCond(&sync.Mutex{}),
        }
-       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_pending_entries",
+                       Help:      "Number of queued pull requests",
+               },
+               func() float64 {
+                       p.cond.L.Lock()
+                       defer p.cond.L.Unlock()
+                       return float64(len(p.todo))
+               },
+       ))
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_inprogress_entries",
+                       Help:      "Number of pull requests in progress",
+               },
+               func() float64 {
+                       return float64(p.inprogress.Load())
+               },
+       ))
+       if len(p.keepstore.mountsW) == 0 {
+               keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+               return p
+       }
+       for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
+               go p.runWorker(ctx)
+       }
+       return p
+}
 
-       // Generate signature with a random token
-       expiresAt := time.Now().Add(60 * time.Second)
-       signedLocator := SignLocator(pullRequest.Locator, token, expiresAt)
+func (p *puller) SetPullList(newlist []PullListItem) {
+       p.cond.L.Lock()
+       p.todo = newlist
+       p.cond.L.Unlock()
+       p.cond.Broadcast()
+}
 
-       reader, contentLen, _, err := GetContent(signedLocator, keepClient)
-       if err != nil {
+func (p *puller) runWorker(ctx context.Context) {
+       if len(p.keepstore.mountsW) == 0 {
+               p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
                return
        }
-       if reader == nil {
-               return fmt.Errorf("No reader found for : %s", signedLocator)
+       c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
+       if err != nil {
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-       defer reader.Close()
-
-       readContent, err := ioutil.ReadAll(reader)
+       c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
+       ac, err := arvadosclient.New(c)
        if err != nil {
-               return err
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-
-       if (readContent == nil) || (int64(len(readContent)) != contentLen) {
-               return fmt.Errorf("Content not found for: %s", signedLocator)
+       keepClient := &keepclient.KeepClient{
+               Arvados:       ac,
+               Want_replicas: 1,
+               DiskCacheSize: keepclient.DiskCacheDisabled,
        }
+       // Ensure the loop below wakes up and returns when ctx
+       // cancels, even if pull list is empty.
+       go func() {
+               <-ctx.Done()
+               p.cond.Broadcast()
+       }()
+       for {
+               p.cond.L.Lock()
+               for len(p.todo) == 0 && ctx.Err() == nil {
+                       p.cond.Wait()
+               }
+               if ctx.Err() != nil {
+                       return
+               }
+               item := p.todo[0]
+               p.todo = p.todo[1:]
+               p.inprogress.Add(1)
+               p.cond.L.Unlock()
 
-       err = PutContent(readContent, pullRequest.Locator)
-       return
-}
+               func() {
+                       defer p.inprogress.Add(-1)
 
-// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
-       reader io.ReadCloser, contentLength int64, url string, err error) {
-       reader, blocklen, url, err := keepClient.Get(signedLocator)
-       return reader, blocklen, url, err
-}
+                       logger := p.keepstore.logger.WithField("locator", item.Locator)
 
-const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
+                       li, err := getLocatorInfo(item.Locator)
+                       if err != nil {
+                               logger.Warn("ignoring pull request for invalid locator")
+                               return
+                       }
 
-// GenerateRandomAPIToken generates a random api token
-func GenerateRandomAPIToken() string {
-       var bytes = make([]byte, 36)
-       rand.Read(bytes)
-       for i, b := range bytes {
-               bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
-       }
-       return (string(bytes))
-}
+                       var dst *mount
+                       if item.MountUUID != "" {
+                               dst = p.keepstore.mounts[item.MountUUID]
+                               if dst == nil {
+                                       logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
+                                       return
+                               } else if !dst.AllowWrite {
+                                       logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
+                                       return
+                               }
+                       } else {
+                               dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
+                       }
 
-// Put block
-var PutContent = func(content []byte, locator string) (err error) {
-       _, err = PutBlock(context.Background(), content, locator)
-       return
+                       serviceRoots := make(map[string]string)
+                       for _, addr := range item.Servers {
+                               serviceRoots[addr] = addr
+                       }
+                       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+
+                       signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
+
+                       buf := bytes.NewBuffer(nil)
+                       _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
+                               Locator: signedLocator,
+                               WriteTo: buf,
+                       })
+                       if err != nil {
+                               logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
+                               return
+                       }
+                       err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
+                       if err != nil {
+                               logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
+                               return
+                       }
+                       logger.Info("block pulled")
+               }()
+       }
 }