b, _ := ioutil.ReadAll(stdoutReader)
stdoutReader.Close()
stdoutChan <- b
+ close(stdoutChan)
}()
stderrChan := make(chan []byte)
b, _ := ioutil.ReadAll(stderrReader)
stderrReader.Close()
stderrChan <- b
+ close(stderrChan)
}()
// Send a tiny script on stdin to execute the crunch-run command
io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
stdinWriter.Close()
- err = cmd.Wait()
-
stdoutMsg := <-stdoutChan
stderrmsg := <-stderrChan
- close(stdoutChan)
- close(stderrChan)
+ err = cmd.Wait()
if err != nil {
submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
- err := scancelCmd(container).Run()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
squeueUpdater.SlurmLock.Unlock()
if err != nil {
- log.Printf("Error stopping container %s with scancel: %v",
- container.UUID, err)
+ log.Printf("Error stopping container %s with %v %v: %v %v",
+ container.UUID, cmd.Path, cmd.Args, err, string(msg))
if squeueUpdater.CheckSqueue(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.",
container.UUID)
return exec.Command("echo")
}
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ container := s.integrationTest(c,
+ func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
}(squeueCmd)
squeueCmd = newSqueueCmd
- // There should be no queued containers now
+ // There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
}
import (
"bufio"
+ "io"
+ "io/ioutil"
"log"
"os/exec"
"sync"
log.Printf("Error creating stdout pipe for squeue: %v", err)
return
}
+
+ stderrReader, err := cmd.StderrPipe()
+ if err != nil {
+ log.Printf("Error creating stderr pipe for squeue: %v", err)
+ return
+ }
+
err = cmd.Start()
if err != nil {
log.Printf("Error running squeue: %v", err)
return
}
+
+ stderrChan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stderrReader)
+ stderrChan <- b
+ close(stderrChan)
+ }()
+
scanner := bufio.NewScanner(sq)
for scanner.Scan() {
newSqueueContents = append(newSqueueContents, scanner.Text())
}
- if err := scanner.Err(); err != nil {
- cmd.Wait()
- log.Printf("Error reading from squeue pipe: %v", err)
- return
- }
+ io.Copy(ioutil.Discard, sq)
+
+ stderrmsg := <-stderrChan
err = cmd.Wait()
+
+ if scanner.Err() != nil {
+ log.Printf("Error reading from squeue pipe: %v", err)
+ }
if err != nil {
- log.Printf("Error running squeue: %v", err)
- return
+ log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
}
- squeue.squeueCond.L.Lock()
- squeue.squeueContents = newSqueueContents
- squeue.squeueCond.Broadcast()
- squeue.squeueCond.L.Unlock()
+ if scanner.Err() == nil && err == nil {
+ squeue.squeueCond.L.Lock()
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
+ squeue.squeueCond.L.Unlock()
+ }
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This