10701: Add back MinRetryPeriod throttle. Update comments/identifiers.
authorTom Clegg <tom@curoverse.com>
Fri, 10 Feb 2017 18:11:52 +0000 (13:11 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 10 Feb 2017 18:11:52 +0000 (13:11 -0500)
sdk/go/dispatch/dispatch.go

index f805298038ff3746bfba1d3f709c8301107c1514..722d4eec6d1abcc0947091f7681e37b3502a98c1 100644 (file)
@@ -1,6 +1,5 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
 package dispatch
 
 import (
@@ -23,10 +22,17 @@ const (
 )
 
 type Dispatcher struct {
-       Arv            *arvadosclient.ArvadosClient
-       PollPeriod     time.Duration
+       Arv *arvadosclient.ArvadosClient
+
+       // Queue polling frequency
+       PollPeriod time.Duration
+
+       // Time to wait between successive attempts to run the same container
        MinRetryPeriod time.Duration
-       RunContainer   Runner
+
+       // Func that implements the container lifecycle. Must be set
+       // to a non-nil DispatchFunc before calling Run().
+       RunContainer DispatchFunc
 
        auth     arvados.APIClientAuthorization
        mtx      sync.Mutex
@@ -34,17 +40,30 @@ type Dispatcher struct {
        throttle throttle
 }
 
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
        }
 
+       d.throttle.hold = d.MinRetryPeriod
+
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
@@ -139,13 +158,20 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
                } else {
                        switch c.State {
                        case Queued:
-                               if err := d.lock(c.UUID); err != nil {
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               err := d.lock(c.UUID)
+                               if err != nil {
                                        log.Printf("debug: error locking container %s: %s", c.UUID, err)
-                               } else {
-                                       c.State = Locked
-                                       d.running[c.UUID] = d.start(c)
+                                       break
                                }
+                               c.State = Locked
+                               d.running[c.UUID] = d.start(c)
                        case Locked, Running:
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
                                d.running[c.UUID] = d.start(c)
                        case Cancelled, Complete:
                                tracker.close()