X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/54377a7bacc182ace0bb8b55a812e0a9fee5ced8..2c6557f613fcf6cdcebb08c321a5d061aeb780c6:/services/keepstore/pull_worker.go diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go index 348bfb4df0..c131de02cb 100644 --- a/services/keepstore/pull_worker.go +++ b/services/keepstore/pull_worker.go @@ -5,90 +5,164 @@ package keepstore import ( + "bytes" "context" - "fmt" - "io" - "io/ioutil" - "time" + "sync" + "sync/atomic" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/prometheus/client_golang/prometheus" ) -// RunPullWorker receives PullRequests from pullq, invokes -// PullItemAndProcess on each one. After each PR, it logs a message -// indicating whether the pull was successful. -func (h *handler) runPullWorker(pullq *WorkQueue) { - for item := range pullq.NextItem { - pr := item.(PullRequest) - err := h.pullItemAndProcess(pr) - pullq.DoneItem <- struct{}{} - if err == nil { - h.Logger.Printf("Pull %s success", pr) - } else { - h.Logger.Printf("Pull %s error: %s", pr, err) - } - } +type PullListItem struct { + Locator string `json:"locator"` + Servers []string `json:"servers"` + MountUUID string `json:"mount_uuid"` // Destination mount, or "" for "anywhere" } -// PullItemAndProcess executes a pull request by retrieving the -// specified block from one of the specified servers, and storing it -// on a local volume. -// -// If the PR specifies a non-blank mount UUID, PullItemAndProcess will -// only attempt to write the data to the corresponding -// volume. Otherwise it writes to any local volume, as a PUT request -// would. -func (h *handler) pullItemAndProcess(pullRequest PullRequest) error { - var vol *VolumeMount - if uuid := pullRequest.MountUUID; uuid != "" { - vol = h.volmgr.Lookup(pullRequest.MountUUID, true) - if vol == nil { - return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest) - } - } +type puller struct { + keepstore *keepstore + todo []PullListItem + cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty + inprogress atomic.Int64 +} - // Make a private copy of keepClient so we can set - // ServiceRoots to the source servers specified in the pull - // request. - keepClient := h.keepClient.Clone() - serviceRoots := make(map[string]string) - for _, addr := range pullRequest.Servers { - serviceRoots[addr] = addr +func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller { + p := &puller{ + keepstore: keepstore, + cond: sync.NewCond(&sync.Mutex{}), } - keepClient.SetServiceRoots(serviceRoots, nil, nil) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "pull_queue_pending_entries", + Help: "Number of queued pull requests", + }, + func() float64 { + p.cond.L.Lock() + defer p.cond.L.Unlock() + return float64(len(p.todo)) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "pull_queue_inprogress_entries", + Help: "Number of pull requests in progress", + }, + func() float64 { + return float64(p.inprogress.Load()) + }, + )) + if len(p.keepstore.mountsW) == 0 { + keepstore.logger.Infof("not running pull worker because there are no writable volumes") + return p + } + for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ { + go p.runWorker(ctx) + } + return p +} - signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute)) +func (p *puller) SetPullList(newlist []PullListItem) { + p.cond.L.Lock() + p.todo = newlist + p.cond.L.Unlock() + p.cond.Broadcast() +} - reader, _, _, err := GetContent(signedLocator, keepClient) - if err != nil { - return err +func (p *puller) runWorker(ctx context.Context) { + if len(p.keepstore.mountsW) == 0 { + p.keepstore.logger.Infof("not running pull worker because there are no writable volumes") + return } - if reader == nil { - return fmt.Errorf("No reader found for : %s", signedLocator) + c, err := arvados.NewClientFromConfig(p.keepstore.cluster) + if err != nil { + p.keepstore.logger.Errorf("error setting up pull worker: %s", err) + return } - defer reader.Close() - - readContent, err := ioutil.ReadAll(reader) + c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster" + ac, err := arvadosclient.New(c) if err != nil { - return err + p.keepstore.logger.Errorf("error setting up pull worker: %s", err) + return } - - if readContent == nil { - return fmt.Errorf("Content not found for: %s", signedLocator) + keepClient := &keepclient.KeepClient{ + Arvados: ac, + Want_replicas: 1, + DiskCacheSize: keepclient.DiskCacheDisabled, } + // Ensure the loop below wakes up and returns when ctx + // cancels, even if pull list is empty. + go func() { + <-ctx.Done() + p.cond.Broadcast() + }() + for { + p.cond.L.Lock() + for len(p.todo) == 0 && ctx.Err() == nil { + p.cond.Wait() + } + if ctx.Err() != nil { + return + } + item := p.todo[0] + p.todo = p.todo[1:] + p.inprogress.Add(1) + p.cond.L.Unlock() - return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator) -} + func() { + defer p.inprogress.Add(-1) -// GetContent fetches the content for the given locator using keepclient. -var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) { - return keepClient.Get(signedLocator) -} + logger := p.keepstore.logger.WithField("locator", item.Locator) + + li, err := parseLocator(item.Locator) + if err != nil { + logger.Warn("ignoring pull request for invalid locator") + return + } + + var dst *mount + if item.MountUUID != "" { + dst = p.keepstore.mounts[item.MountUUID] + if dst == nil { + logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID) + return + } else if !dst.AllowWrite { + logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID) + return + } + } else { + dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0] + } + + serviceRoots := make(map[string]string) + for _, addr := range item.Servers { + serviceRoots[addr] = addr + } + keepClient.SetServiceRoots(serviceRoots, nil, nil) + + signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator) -var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error { - if volume != nil { - return volume.Put(context.Background(), locator, data) + buf := bytes.NewBuffer(nil) + _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: signedLocator, + WriteTo: buf, + }) + if err != nil { + logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers) + return + } + err = dst.BlockWrite(ctx, li.hash, buf.Bytes()) + if err != nil { + logger.WithError(err).Warnf("error writing data to %s", dst.UUID) + return + } + logger.Info("block pulled") + }() } - _, err := PutBlock(context.Background(), volmgr, data, locator, nil) - return err }