18102: Fix double-unlock in scheduler.
[arvados.git] / lib / lsf / dispatch.go
index ff95d0db29c0a86aa38432cf3928184537fd8f23..7461597c45c8e48cee3d105877dc1d16abd7cf0c 100644 (file)
@@ -5,12 +5,12 @@
 package lsf
 
 import (
-       "bytes"
        "context"
        "errors"
        "fmt"
        "math"
        "net/http"
+       "regexp"
        "strings"
        "sync"
        "time"
@@ -161,7 +161,7 @@ func (disp *dispatcher) init() {
        }
 }
 
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
        ctx, cancel := context.WithCancel(disp.Context)
        defer cancel()
 
@@ -172,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
                cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
-               if err := disp.submit(ctr, cmd); err != nil {
-                       var text string
-                       switch err := err.(type) {
-                       case dispatchcloud.ConstraintsNotSatisfiableError:
-                               var logBuf bytes.Buffer
-                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-                               if len(err.AvailableTypes) == 0 {
-                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
-                               } else {
-                                       fmt.Fprint(&logBuf, "Available instance types:\n")
-                                       for _, t := range err.AvailableTypes {
-                                               fmt.Fprintf(&logBuf,
-                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-                                               )
-                                       }
-                               }
-                               text = logBuf.String()
-                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
-                       default:
-                               text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
-                       }
-                       disp.logger.Print(text)
-
-                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                               "object_uuid": ctr.UUID,
-                               "event_type":  "dispatch",
-                               "properties":  map[string]string{"text": text}}}
-                       disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
-                       disp.arvDispatcher.Unlock(ctr.UUID)
-                       return
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
                }
        }
 
@@ -236,9 +207,14 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                        case dispatch.Locked:
                                disp.arvDispatcher.Unlock(ctr.UUID)
                        }
-                       return
+                       return nil
                case updated, ok := <-status:
                        if !ok {
+                               // status channel is closed, which is
+                               // how arvDispatcher tells us to stop
+                               // touching the container record, kill
+                               // off any remaining LSF processes,
+                               // etc.
                                done = true
                                break
                        }
@@ -246,7 +222,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
                        }
                        ctr = updated
-                       if ctr.Priority == 0 {
+                       if ctr.Priority < 1 {
                                disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
                                disp.bkill(ctr)
                        } else {
@@ -267,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                }
                <-ticker.C
        }
+       return nil
 }
 
 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
@@ -315,8 +292,23 @@ func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string
        }
 }
 
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
 func (disp *dispatcher) checkLsfQueueForOrphans() {
-       disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+       containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+       for _, uuid := range disp.lsfqueue.All() {
+               if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+                       continue
+               }
+               err := disp.arvDispatcher.TrackContainer(uuid)
+               if err != nil {
+                       disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+               }
+       }
 }
 
 func execScript(args []string) []byte {