16552: change default db name to just arvados.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 722d4eec6d1abcc0947091f7681e37b3502a98c1..d34ea68d7a3c4f968e0e44bf3d78bd24bd79f3a7 100644 (file)
@@ -1,16 +1,22 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 // Package dispatch is a helper library for building Arvados container
 // dispatchers.
 package dispatch
 
 import (
+       "bytes"
        "context"
        "fmt"
-       "log"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "github.com/sirupsen/logrus"
 )
 
 const (
@@ -21,9 +27,21 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
+type Logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
+// Dispatcher struct
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       Logger Logger
+
+       // Batch size for container queries
+       BatchSize int
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -36,7 +54,7 @@ type Dispatcher struct {
 
        auth     arvados.APIClientAuthorization
        mtx      sync.Mutex
-       running  map[string]*runTracker
+       trackers map[string]*runTracker
        throttle throttle
 }
 
@@ -50,13 +68,17 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
 // dispatcher's token. When a new one appears, Run calls RunContainer
 // in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
+       if d.Logger == nil {
+               d.Logger = logrus.StandardLogger()
+       }
+
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
@@ -67,93 +89,211 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
-               d.checkForUpdates([][]interface{}{
-                       {"uuid", "in", d.runningUUIDs()}})
-               d.checkForUpdates([][]interface{}{
-                       {"locked_by_uuid", "=", d.auth.UUID},
-                       {"uuid", "not in", d.runningUUIDs()}})
-               d.checkForUpdates([][]interface{}{
-                       {"state", "=", Queued},
-                       {"priority", ">", "0"},
-                       {"uuid", "not in", d.runningUUIDs()}})
                select {
                case <-poll.C:
-                       continue
+                       break
                case <-ctx.Done():
+                       d.mtx.Lock()
+                       defer d.mtx.Unlock()
+                       for _, tracker := range d.trackers {
+                               tracker.close()
+                       }
                        return ctx.Err()
                }
-       }
-}
 
-func (d *Dispatcher) runningUUIDs() []string {
-       d.mtx.Lock()
-       defer d.mtx.Unlock()
-       if len(d.running) == 0 {
-               // API bug: ["uuid", "not in", []] does not match everything
-               return []string{"X"}
-       }
-       uuids := make([]string, 0, len(d.running))
-       for x := range d.running {
-               uuids = append(uuids, x)
+               todo := make(map[string]*runTracker)
+               d.mtx.Lock()
+               // Make a copy of trackers
+               for uuid, tracker := range d.trackers {
+                       todo[uuid] = tracker
+               }
+               d.mtx.Unlock()
+
+               // Containers I currently own (Locked/Running)
+               querySuccess := d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+               // Containers I should try to dispatch
+               querySuccess = d.checkForUpdates([][]interface{}{
+                       {"state", "=", Queued},
+                       {"priority", ">", "0"}}, todo) && querySuccess
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries,
+                       // we probably didn't get updates for all the
+                       // containers we should have.  Don't check them
+                       // individually because it may be expensive.
+                       continue
+               }
+
+               // Containers I know about but didn't fall into the
+               // above two categories (probably Complete/Cancelled)
+               var missed []string
+               for uuid := range todo {
+                       missed = append(missed, uuid)
+               }
+
+               for len(missed) > 0 {
+                       var batch []string
+                       if len(missed) > 20 {
+                               batch = missed[0:20]
+                               missed = missed[20:]
+                       } else {
+                               batch = missed
+                               missed = missed[0:0]
+                       }
+                       querySuccess = d.checkForUpdates([][]interface{}{
+                               {"uuid", "in", batch}}, todo) && querySuccess
+               }
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries, we probably
+                       // didn't see all the containers we should have, so don't shut down
+                       // the missed containers.
+                       continue
+               }
+
+               // Containers that I know about that didn't show up in any
+               // query should be let go.
+               for uuid, tracker := range todo {
+                       d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+                       tracker.close()
+               }
+
        }
-       return uuids
 }
 
 // Start a runner in a new goroutine, and send the initial container
 // record to its updates channel.
 func (d *Dispatcher) start(c arvados.Container) *runTracker {
-       updates := make(chan arvados.Container, 1)
-       tracker := &runTracker{updates: updates}
+       tracker := &runTracker{
+               updates: make(chan arvados.Container, 1),
+               logger:  d.Logger,
+       }
        tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, tracker.updates)
-
-               d.mtx.Lock()
-               delete(d.running, c.UUID)
-               d.mtx.Unlock()
+               fallbackState := Queued
+               err := d.RunContainer(d, c, tracker.updates)
+               if err != nil {
+                       text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               fallbackState = Cancelled
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+                                       }
+                               }
+                               text = logBuf.String()
+                       }
+                       d.Logger.Printf("%s", text)
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": c.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       d.Arv.Create("logs", lr, nil)
+               }
+               // If checkListForUpdates() doesn't close the tracker
+               // after 2 queue updates, try to move the container to
+               // the fallback state, which should eventually work
+               // and cause the tracker to close.
+               updates := 0
+               for upd := range tracker.updates {
+                       updates++
+                       if upd.State == Locked || upd.State == Running {
+                               // Tracker didn't clean up before
+                               // returning -- or this is the first
+                               // update and it contains stale
+                               // information from before
+                               // RunContainer() returned.
+                               if updates < 2 {
+                                       // Avoid generating confusing
+                                       // logs / API calls in the
+                                       // stale-info case.
+                                       continue
+                               }
+                               d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+                               d.UpdateState(c.UUID, fallbackState)
+                       }
+               }
        }()
        return tracker
 }
 
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
-               "order":   []string{"priority desc"},
-               "limit":   "1000"}
-
-       var list arvados.ContainerList
-       err := d.Arv.List("containers", params, &list)
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
        if err != nil {
-               log.Printf("Error getting list of containers: %q", err)
-               return
+               d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
        }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
+               "order":   []string{"priority desc"}}
+       offset := 0
+       for {
+               params["offset"] = offset
+
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
 
-       if list.ItemsAvailable > len(list.Items) {
-               // TODO: support paging
-               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                       list.ItemsAvailable,
-                       len(list.Items))
+               err := d.Arv.List("containers", params, &list)
+               if err != nil {
+                       d.Logger.Warnf("error getting list of containers: %q", err)
+                       return false
+               }
+               d.checkListForUpdates(list.Items, todo)
+               offset += len(list.Items)
+               if len(list.Items) == 0 || itemsAvailable <= offset {
+                       return true
+               }
        }
+}
 
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
        d.mtx.Lock()
        defer d.mtx.Unlock()
-       if d.running == nil {
-               d.running = make(map[string]*runTracker)
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
        }
 
-       for _, c := range list.Items {
-               tracker, running := d.running[c.UUID]
+       for _, c := range containers {
+               tracker, alreadyTracking := d.trackers[c.UUID]
+               delete(todo, c.UUID)
+
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
-                       log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
-               } else if running {
+                       d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+               } else if alreadyTracking {
                        switch c.State {
-                       case Queued:
+                       case Queued, Cancelled, Complete:
+                               d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
                                tracker.close()
+                               delete(d.trackers, c.UUID)
                        case Locked, Running:
+                               d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
                                tracker.update(c)
-                       case Cancelled, Complete:
-                               tracker.close()
                        }
                } else {
                        switch c.State {
@@ -163,18 +303,18 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
                                }
                                err := d.lock(c.UUID)
                                if err != nil {
-                                       log.Printf("debug: error locking container %s: %s", c.UUID, err)
+                                       d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
                                        break
                                }
                                c.State = Locked
-                               d.running[c.UUID] = d.start(c)
+                               d.trackers[c.UUID] = d.start(c)
                        case Locked, Running:
                                if !d.throttle.Check(c.UUID) {
                                        break
                                }
-                               d.running[c.UUID] = d.start(c)
+                               d.trackers[c.UUID] = d.start(c)
                        case Cancelled, Complete:
-                               tracker.close()
+                               // no-op (we already stopped monitoring)
                        }
                }
        }
@@ -187,7 +327,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro
                        "container": arvadosclient.Dict{"state": state},
                }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+               d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
@@ -202,9 +342,47 @@ func (d *Dispatcher) Unlock(uuid string) error {
        return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+       var cntr arvados.Container
+       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+       if err != nil {
+               return err
+       }
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
+       }
+
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
+       }
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+       d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
+}
+
 type runTracker struct {
        closing bool
-       updates chan<- arvados.Container
+       updates chan arvados.Container
+       logger  Logger
 }
 
 func (tracker *runTracker) close() {
@@ -220,7 +398,7 @@ func (tracker *runTracker) update(c arvados.Container) {
        }
        select {
        case <-tracker.updates:
-               log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+               tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
        default:
        }
        tracker.updates <- c