-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
import (
- "crypto/rand"
- "errors"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io"
- "io/ioutil"
- "log"
- "time"
+ "bytes"
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
)
-/*
- Keepstore initiates pull worker channel goroutine.
- The channel will process pull list.
- For each (next) pull request:
- For each locator listed, execute Pull on the server(s) listed
- Skip the rest of the servers if no errors
- Repeat
-*/
-func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
- nextItem := pullq.NextItem
- for item := range nextItem {
- pullRequest := item.(PullRequest)
- err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
- if err == nil {
- log.Printf("Pull %s success", pullRequest)
- } else {
- log.Printf("Pull %s error: %s", pullRequest, err)
- }
- }
+type PullListItem struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ MountUUID string `json:"mount_uuid"` // Destination mount, or "" for "anywhere"
}
-/*
- For each Pull request:
- Generate a random API token.
- Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
- Using this token & signature, retrieve the given block.
- Write to storage
-*/
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
- keepClient.Arvados.ApiToken = token
+type puller struct {
+ keepstore *keepstore
+ todo []PullListItem
+ cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
+ inprogress atomic.Int64
+}
- service_roots := make(map[string]string)
- for _, addr := range pullRequest.Servers {
- service_roots[addr] = addr
+func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
+ p := &puller{
+ keepstore: keepstore,
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "pull_queue_pending_entries",
+ Help: "Number of queued pull requests",
+ },
+ func() float64 {
+ p.cond.L.Lock()
+ defer p.cond.L.Unlock()
+ return float64(len(p.todo))
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "pull_queue_inprogress_entries",
+ Help: "Number of pull requests in progress",
+ },
+ func() float64 {
+ return float64(p.inprogress.Load())
+ },
+ ))
+ if len(p.keepstore.mountsW) == 0 {
+ keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+ return p
}
- keepClient.SetServiceRoots(service_roots, nil, keepClient.WritableRoots())
+ for i := 0; i < 1 || i < keepstore.cluster.Collections.BlobReplicateConcurrency; i++ {
+ go p.runWorker(ctx)
+ }
+ return p
+}
- // Generate signature with a random token
- expires_at := time.Now().Add(60 * time.Second)
- signedLocator := SignLocator(pullRequest.Locator, token, expires_at)
+func (p *puller) SetPullList(newlist []PullListItem) {
+ p.cond.L.Lock()
+ p.todo = newlist
+ p.cond.L.Unlock()
+ p.cond.Broadcast()
+}
- reader, contentLen, _, err := GetContent(signedLocator, keepClient)
- if err != nil {
+func (p *puller) runWorker(ctx context.Context) {
+ if len(p.keepstore.mountsW) == 0 {
+ p.keepstore.logger.Infof("not running pull worker because there are no writable volumes")
return
}
- if reader == nil {
- return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
+ c, err := arvados.NewClientFromConfig(p.keepstore.cluster)
+ if err != nil {
+ p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+ return
}
- defer reader.Close()
-
- read_content, err := ioutil.ReadAll(reader)
+ c.AuthToken = "keepstore-token-used-for-pulling-data-from-same-cluster"
+ ac, err := arvadosclient.New(c)
if err != nil {
- return err
+ p.keepstore.logger.Errorf("error setting up pull worker: %s", err)
+ return
}
-
- if (read_content == nil) || (int64(len(read_content)) != contentLen) {
- return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
+ keepClient := &keepclient.KeepClient{
+ Arvados: ac,
+ Want_replicas: 1,
+ DiskCacheSize: keepclient.DiskCacheDisabled,
}
+ // Ensure the loop below wakes up and returns when ctx
+ // cancels, even if pull list is empty.
+ go func() {
+ <-ctx.Done()
+ p.cond.Broadcast()
+ }()
+ for {
+ p.cond.L.Lock()
+ for len(p.todo) == 0 && ctx.Err() == nil {
+ p.cond.Wait()
+ }
+ if ctx.Err() != nil {
+ return
+ }
+ item := p.todo[0]
+ p.todo = p.todo[1:]
+ p.inprogress.Add(1)
+ p.cond.L.Unlock()
- err = PutContent(read_content, pullRequest.Locator)
- return
-}
+ func() {
+ defer p.inprogress.Add(-1)
-// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
- reader io.ReadCloser, contentLength int64, url string, err error) {
- reader, blocklen, url, err := keepClient.Get(signedLocator)
- return reader, blocklen, url, err
-}
+ logger := p.keepstore.logger.WithField("locator", item.Locator)
-const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
+ li, err := getLocatorInfo(item.Locator)
+ if err != nil {
+ logger.Warn("ignoring pull request for invalid locator")
+ return
+ }
-func GenerateRandomApiToken() string {
- var bytes = make([]byte, 36)
- rand.Read(bytes)
- for i, b := range bytes {
- bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
- }
- return (string(bytes))
-}
+ var dst *mount
+ if item.MountUUID != "" {
+ dst = p.keepstore.mounts[item.MountUUID]
+ if dst == nil {
+ logger.Warnf("ignoring pull list entry for nonexistent mount %s", item.MountUUID)
+ return
+ } else if !dst.AllowWrite {
+ logger.Warnf("ignoring pull list entry for readonly mount %s", item.MountUUID)
+ return
+ }
+ } else {
+ dst = p.keepstore.rendezvous(item.Locator, p.keepstore.mountsW)[0]
+ }
-// Put block
-var PutContent = func(content []byte, locator string) (err error) {
- err = PutBlock(content, locator)
- return
+ serviceRoots := make(map[string]string)
+ for _, addr := range item.Servers {
+ serviceRoots[addr] = addr
+ }
+ keepClient.SetServiceRoots(serviceRoots, nil, nil)
+
+ signedLocator := p.keepstore.signLocator(c.AuthToken, item.Locator)
+
+ buf := bytes.NewBuffer(nil)
+ _, err = keepClient.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: signedLocator,
+ WriteTo: buf,
+ })
+ if err != nil {
+ logger.WithError(err).Warnf("error pulling data from remote servers (%s)", item.Servers)
+ return
+ }
+ err = dst.BlockWrite(ctx, li.hash, buf.Bytes())
+ if err != nil {
+ logger.WithError(err).Warnf("error writing data to %s", dst.UUID)
+ return
+ }
+ logger.Info("block pulled")
+ }()
+ }
}