+ c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
+ ac, err := arvadosclient.New(c)
+ if err != nil {
+ p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+ return
+ }
+ keepClient := &keepclient.KeepClient{
+ Arvados: ac,
+ Want_replicas: 1,
+ DiskCacheSize: keepclient.DiskCacheDisabled,
+ }
+ // Ensure the loop below wakes up and returns when ctx
+ // cancels, even if pull list is empty.
+ go func() {
+ <-ctx.Done()
+ p.cond.Broadcast()
+ }()
+ for {
+ p.cond.L.Lock()
+ for len(p.todo) == 0 && ctx.Err() == nil {
+ p.cond.Wait()
+ }
+ if ctx.Err() != nil {
+ return
+ }
+ item := p.todo[0]
+ p.todo = p.todo[1:]
+ p.inprogress.Add(1)
+ p.cond.L.Unlock()
+
+ func() {
+ defer p.inprogress.Add(-1)
+
+ logger := p.keepstore.logger.WithField("locator", item.Locator)
+
+ li, err := getLocatorInfo(item.Locator)
+ if err != nil {
+ logger.Warn("ignoring pull request for invalid locator")
+ return
+ }
+
+ var dst *mount
+ if item.MountUUID != "" {
+ dst = p.keepstore.mounts[item.MountUUID]
+ if dst == nil {
+ logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
+ return
+ } else if !dst.AllowWrite {
+ logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
+ return
+ }
+ } else {
+ dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
+ }
+
+ serviceRoots := make(map[string]string)
+ for _, addr := range item.Servers {
+ serviceRoots[addr] = addr
+ }
+ keepClient.SetServiceRoots(serviceRoots, nil, nil)
+
+ signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
+
+ buf := bytes.NewBuffer(nil)
+ _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: signedLocator,
+ WriteTo: buf,
+ })
+ if err != nil {
+ logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
+ return
+ }
+ err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
+ if err != nil {
+ logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
+ return
+ }
+ logger.Info("block pulled")
+ }()