- serviceRoots := make(map[string]string)
- for _, addr := range pullRequest.Servers {
- serviceRoots[addr] = addr
+func newPuller(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *puller {
+ p := &puller{
+ keepstore: keepstore,
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "pull_queue_pending_entries",
+ Help: "Number of queued pull requests",
+ },
+ func() float64 {
+ p.cond.L.Lock()
+ defer p.cond.L.Unlock()
+ return float64(len(p.todo))
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "pull_queue_inprogress_entries",
+ Help: "Number of pull requests in progress",
+ },
+ func() float64 {
+ return float64(p.inprogress.Load())
+ },
+ ))
+ if len(p.keepstore.mountsW) == 0 {
+ keepstore.logger.Infof("not running pull worker because there are no writable volumes")
+ return p