Merge branch '18947-githttpd'
[arvados.git] / sdk / go / dispatch / dispatch.go
index 152207ea94bb4bb4c4e21de36ca2b440691109a2..d34ea68d7a3c4f968e0e44bf3d78bd24bd79f3a7 100644 (file)
@@ -7,14 +7,16 @@
 package dispatch
 
 import (
+       "bytes"
        "context"
        "fmt"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "github.com/sirupsen/logrus"
 )
 
 const (
@@ -37,6 +39,9 @@ type Dispatcher struct {
 
        Logger Logger
 
+       // Batch size for container queries
+       BatchSize int
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -63,7 +68,7 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
@@ -84,11 +89,20 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
                        break
                case <-ctx.Done():
+                       d.mtx.Lock()
+                       defer d.mtx.Unlock()
+                       for _, tracker := range d.trackers {
+                               tracker.close()
+                       }
                        return ctx.Err()
                }
 
@@ -163,19 +177,77 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        }
        tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, tracker.updates)
-               // RunContainer blocks for the lifetime of the container.  When
-               // it returns, the tracker should delete itself.
-               d.mtx.Lock()
-               delete(d.trackers, c.UUID)
-               d.mtx.Unlock()
+               fallbackState := Queued
+               err := d.RunContainer(d, c, tracker.updates)
+               if err != nil {
+                       text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               fallbackState = Cancelled
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+                                       }
+                               }
+                               text = logBuf.String()
+                       }
+                       d.Logger.Printf("%s", text)
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": c.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       d.Arv.Create("logs", lr, nil)
+               }
+               // If checkListForUpdates() doesn't close the tracker
+               // after 2 queue updates, try to move the container to
+               // the fallback state, which should eventually work
+               // and cause the tracker to close.
+               updates := 0
+               for upd := range tracker.updates {
+                       updates++
+                       if upd.State == Locked || upd.State == Running {
+                               // Tracker didn't clean up before
+                               // returning -- or this is the first
+                               // update and it contains stale
+                               // information from before
+                               // RunContainer() returned.
+                               if updates < 2 {
+                                       // Avoid generating confusing
+                                       // logs / API calls in the
+                                       // stale-info case.
+                                       continue
+                               }
+                               d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+                               d.UpdateState(c.UUID, fallbackState)
+                       }
+               }
        }()
        return tracker
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
@@ -194,7 +266,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
@@ -215,12 +287,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                        d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {
                        switch c.State {
-                       case Queued:
+                       case Queued, Cancelled, Complete:
+                               d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
                                tracker.close()
+                               delete(d.trackers, c.UUID)
                        case Locked, Running:
+                               d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
                                tracker.update(c)
-                       case Cancelled, Complete:
-                               tracker.close()
                        }
                } else {
                        switch c.State {