17756: Move "no suitable instance type" reporting to dispatch lib.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 74cefed05794b7c242ebd8033fba1e8a5781547a..00c75154f656a70e0b42deed7ef0e34fa7a01d7d 100644 (file)
@@ -7,14 +7,16 @@
 package dispatch
 
 import (
+       "bytes"
        "context"
        "fmt"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "github.com/sirupsen/logrus"
 )
 
 const (
@@ -37,6 +39,9 @@ type Dispatcher struct {
 
        Logger Logger
 
+       // Batch size for container queries
+       BatchSize int
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -63,7 +68,7 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
@@ -84,6 +89,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -163,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        }
        tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, tracker.updates)
-               // RunContainer blocks for the lifetime of the container.  When
-               // it returns, the tracker should delete itself.
+               err := d.RunContainer(d, c, tracker.updates)
+               if err != nil {
+                       text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+                                       }
+                               }
+                               text = logBuf.String()
+                               d.UpdateState(c.UUID, Cancelled)
+                       }
+                       d.Logger.Printf("%s", text)
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": c.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       d.Arv.Create("logs", lr, nil)
+                       d.Unlock(c.UUID)
+               }
+
                d.mtx.Lock()
                delete(d.trackers, c.UUID)
                d.mtx.Unlock()
@@ -174,8 +208,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
@@ -194,7 +242,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
@@ -230,7 +278,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                                }
                                err := d.lock(c.UUID)
                                if err != nil {
-                                       d.Logger.Debugf("error locking container %s: %s", c.UUID, err)
+                                       d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
                                        break
                                }
                                c.State = Locked