2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / pull_worker.go
index 348bfb4df00087a1726ef36cbd186fe0eb5ea4c7..c131de02cbbfc01230015b59aa1350420da830ae 100644 (file)
 package keepstore
 
 import (
+       "bytes"
        "context"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "time"
+       "sync"
+       "sync/atomic"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
-// RunPullWorker receives PullRequests from pullq, invokes
-// PullItemAndProcess on each one. After each PR, it logs a message
-// indicating whether the pull was successful.
-func (h *handler) runPullWorker(pullq *WorkQueue) {
-       for item := range pullq.NextItem {
-               pr := item.(PullRequest)
-               err := h.pullItemAndProcess(pr)
-               pullq.DoneItem <- struct{}{}
-               if err == nil {
-                       h.Logger.Printf("Pull %s success", pr)
-               } else {
-                       h.Logger.Printf("Pull %s error: %s", pr, err)
-               }
-       }
+type PullListItem struct {
+       Locator   string   `json:"locator"`
+       Servers   []string `json:"servers"`
+       MountUUID string   `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
 }
 
-// PullItemAndProcess executes a pull request by retrieving the
-// specified block from one of the specified servers, and storing it
-// on a local volume.
-//
-// If the PR specifies a non-blank mount UUID, PullItemAndProcess will
-// only attempt to write the data to the corresponding
-// volume. Otherwise it writes to any local volume, as a PUT request
-// would.
-func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
-       var vol *VolumeMount
-       if uuid := pullRequest.MountUUID; uuid != "" {
-               vol = h.volmgr.Lookup(pullRequest.MountUUID, true)
-               if vol == nil {
-                       return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
-               }
-       }
+type puller struct {
+       keepstore  *keepstore
+       todo       []PullListItem
+       cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
+       inprogress atomic.Int64
+}
 
-       // Make a private copy of keepClient so we can set
-       // ServiceRoots to the source servers specified in the pull
-       // request.
-       keepClient := h.keepClient.Clone()
-       serviceRoots := make(map[string]string)
-       for _, addr := range pullRequest.Servers {
-               serviceRoots[addr] = addr
+func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
+       p := &puller{
+               keepstore: keepstore,
+               cond:      sync.NewCond(&sync.Mutex{}),
        }
-       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_pending_entries",
+                       Help:      "Number of queued pull requests",
+               },
+               func() float64 {
+                       p.cond.L.Lock()
+                       defer p.cond.L.Unlock()
+                       return float64(len(p.todo))
+               },
+       ))
+       reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "pull_queue_inprogress_entries",
+                       Help:      "Number of pull requests in progress",
+               },
+               func() float64 {
+                       return float64(p.inprogress.Load())
+               },
+       ))
+       if len(p.keepstore.mountsW) == 0 {
+               keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+               return p
+       }
+       for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
+               go p.runWorker(ctx)
+       }
+       return p
+}
 
-       signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
+func (p *puller) SetPullList(newlist []PullListItem) {
+       p.cond.L.Lock()
+       p.todo = newlist
+       p.cond.L.Unlock()
+       p.cond.Broadcast()
+}
 
-       reader, _, _, err := GetContent(signedLocator, keepClient)
-       if err != nil {
-               return err
+func (p *puller) runWorker(ctx context.Context) {
+       if len(p.keepstore.mountsW) == 0 {
+               p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+               return
        }
-       if reader == nil {
-               return fmt.Errorf("No reader found for : %s", signedLocator)
+       c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
+       if err != nil {
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-       defer reader.Close()
-
-       readContent, err := ioutil.ReadAll(reader)
+       c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
+       ac, err := arvadosclient.New(c)
        if err != nil {
-               return err
+               p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+               return
        }
-
-       if readContent == nil {
-               return fmt.Errorf("Content not found for: %s", signedLocator)
+       keepClient := &keepclient.KeepClient{
+               Arvados:       ac,
+               Want_replicas: 1,
+               DiskCacheSize: keepclient.DiskCacheDisabled,
        }
+       // Ensure the loop below wakes up and returns when ctx
+       // cancels, even if pull list is empty.
+       go func() {
+               <-ctx.Done()
+               p.cond.Broadcast()
+       }()
+       for {
+               p.cond.L.Lock()
+               for len(p.todo) == 0 && ctx.Err() == nil {
+                       p.cond.Wait()
+               }
+               if ctx.Err() != nil {
+                       return
+               }
+               item := p.todo[0]
+               p.todo = p.todo[1:]
+               p.inprogress.Add(1)
+               p.cond.L.Unlock()
 
-       return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator)
-}
+               func() {
+                       defer p.inprogress.Add(-1)
 
-// GetContent fetches the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
-       return keepClient.Get(signedLocator)
-}
+                       logger := p.keepstore.logger.WithField("locator", item.Locator)
+
+                       li, err := parseLocator(item.Locator)
+                       if err != nil {
+                               logger.Warn("ignoring pull request for invalid locator")
+                               return
+                       }
+
+                       var dst *mount
+                       if item.MountUUID != "" {
+                               dst = p.keepstore.mounts[item.MountUUID]
+                               if dst == nil {
+                                       logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
+                                       return
+                               } else if !dst.AllowWrite {
+                                       logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
+                                       return
+                               }
+                       } else {
+                               dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
+                       }
+
+                       serviceRoots := make(map[string]string)
+                       for _, addr := range item.Servers {
+                               serviceRoots[addr] = addr
+                       }
+                       keepClient.SetServiceRoots(serviceRoots, nil, nil)
+
+                       signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
 
-var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error {
-       if volume != nil {
-               return volume.Put(context.Background(), locator, data)
+                       buf := bytes.NewBuffer(nil)
+                       _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
+                               Locator: signedLocator,
+                               WriteTo: buf,
+                       })
+                       if err != nil {
+                               logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
+                               return
+                       }
+                       err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
+                       if err != nil {
+                               logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
+                               return
+                       }
+                       logger.Info("block pulled")
+               }()
        }
-       _, err := PutBlock(context.Background(), volmgr, data, locator, nil)
-       return err
 }