10649: Make errors emitted by squeue and scancel show up in logs.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 1 Dec 2016 17:56:17 +0000 (12:56 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 2 Dec 2016 14:42:39 +0000 (09:42 -0500)
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/squeue.go

index 3c4f281912842a0ceedb6df409aa61e80fa38fa2..e768b509cd6f2c69bb529d9e9a90e2d923e422ce 100644 (file)
@@ -195,6 +195,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                b, _ := ioutil.ReadAll(stdoutReader)
                stdoutReader.Close()
                stdoutChan <- b
+               close(stdoutChan)
        }()
 
        stderrChan := make(chan []byte)
@@ -202,6 +203,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                b, _ := ioutil.ReadAll(stderrReader)
                stderrReader.Close()
                stderrChan <- b
+               close(stderrChan)
        }()
 
        // Send a tiny script on stdin to execute the crunch-run command
@@ -209,13 +211,10 @@ func submit(dispatcher *dispatch.Dispatcher,
        io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
        stdinWriter.Close()
 
-       err = cmd.Wait()
-
        stdoutMsg := <-stdoutChan
        stderrmsg := <-stderrChan
 
-       close(stdoutChan)
-       close(stderrChan)
+       err = cmd.Wait()
 
        if err != nil {
                submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
@@ -302,12 +301,13 @@ func run(dispatcher *dispatch.Dispatcher,
 
                                // Mutex between squeue sync and running sbatch or scancel.
                                squeueUpdater.SlurmLock.Lock()
-                               err := scancelCmd(container).Run()
+                               cmd := scancelCmd(container)
+                               msg, err := cmd.CombinedOutput()
                                squeueUpdater.SlurmLock.Unlock()
 
                                if err != nil {
-                                       log.Printf("Error stopping container %s with scancel: %v",
-                                               container.UUID, err)
+                                       log.Printf("Error stopping container %s with %v %v: %v %v",
+                                               container.UUID, cmd.Path, cmd.Args, err, string(msg))
                                        if squeueUpdater.CheckSqueue(container.UUID) {
                                                log.Printf("Container %s is still in squeue after scancel.",
                                                        container.UUID)
index fbea48e548a59f78718cb0afa419b5a84a1cd89b..40461031e214486f1dbed9feeda6aae0d97fb76c 100644 (file)
@@ -81,7 +81,8 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                return exec.Command("echo")
        }
 
-       container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+       container := s.integrationTest(c,
+               func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -134,7 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
        }(squeueCmd)
        squeueCmd = newSqueueCmd
 
-       // There should be no queued containers now
+       // There should be one queued container
        params := arvadosclient.Dict{
                "filters": [][]string{{"state", "=", "Queued"}},
        }
index 61decde61c4bd61d0a92e96bde20ff0c82780f57..45d06c8c1e27f12f2bc6e83ca262ab2ff7f08a53 100644 (file)
@@ -2,6 +2,8 @@ package main
 
 import (
        "bufio"
+       "io"
+       "io/ioutil"
        "log"
        "os/exec"
        "sync"
@@ -45,31 +47,49 @@ func (squeue *Squeue) RunSqueue() {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
                return
        }
+
+       stderrReader, err := cmd.StderrPipe()
+       if err != nil {
+               log.Printf("Error creating stderr pipe for squeue: %v", err)
+               return
+       }
+
        err = cmd.Start()
        if err != nil {
                log.Printf("Error running squeue: %v", err)
                return
        }
+
+       stderrChan := make(chan []byte)
+       go func() {
+               b, _ := ioutil.ReadAll(stderrReader)
+               stderrChan <- b
+               close(stderrChan)
+       }()
+
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
        }
-       if err := scanner.Err(); err != nil {
-               cmd.Wait()
-               log.Printf("Error reading from squeue pipe: %v", err)
-               return
-       }
+       io.Copy(ioutil.Discard, sq)
+
+       stderrmsg := <-stderrChan
 
        err = cmd.Wait()
+
+       if scanner.Err() != nil {
+               log.Printf("Error reading from squeue pipe: %v", err)
+       }
        if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
+               log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
        }
 
-       squeue.squeueCond.L.Lock()
-       squeue.squeueContents = newSqueueContents
-       squeue.squeueCond.Broadcast()
-       squeue.squeueCond.L.Unlock()
+       if scanner.Err() == nil && err == nil {
+               squeue.squeueCond.L.Lock()
+               squeue.squeueContents = newSqueueContents
+               squeue.squeueCond.Broadcast()
+               squeue.squeueCond.L.Unlock()
+       }
 }
 
 // CheckSqueue checks if a given container UUID is in the slurm queue.  This