Merge branch '17816-singularity-cwd' into main refs #17816
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 16 Jul 2021 15:27:06 +0000 (11:27 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 16 Jul 2021 15:27:06 +0000 (11:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

1  2 
lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go

index 23fbc430b42611eda108563e91f5387d766208ce,08e4aa3899ec46ca68d86d5ef1ea930057830c00..412f1bbfbfa95027eb5c043c5e1fcf07449139b0
@@@ -55,7 -55,7 +55,7 @@@ var ErrCancelled = errors.New("Cancelle
  
  // IKeepClient is the minimal Keep API methods used by crunch-run.
  type IKeepClient interface {
 -      PutB(buf []byte) (string, int, error)
 +      BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
@@@ -77,7 -77,10 +77,10 @@@ type PsProcess interface 
  // ContainerRunner is the main stateful struct used for a single execution of a
  // container.
  type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
  
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@@ -877,7 -878,7 +878,7 @@@ func (runner *ContainerRunner) getStdou
  
  // CreateContainer creates the docker container.
  func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
        if !runner.enableMemoryLimit {
                ram = 0
        }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
        return runner.executor.Create(containerSpec{
                Image:         imageID,
                VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
@@@ -1018,6 -1022,27 +1022,27 @@@ func (runner *ContainerRunner) WaitFini
        }
        runner.ExitCode = &exitcode
  
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
        if runner.statReporter != nil {
                runner.statReporter.Stop()
                err = runner.statLogger.Close()
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
  }
  
  func (runner *ContainerRunner) updateLogs() {
index 42a2cf3ad84de8e214a7efdaa1216e01258625c2,22d65334ed3bc2a30228c6ab5c55dbd51e674f6c..bb7ffdf0306b26b2f5c56062aaaaaf7b256e5447
@@@ -120,8 -120,6 +120,6 @@@ func (e *stubExecutor) CgroupID() strin
  func (e *stubExecutor) Stop() error                     { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
  func (e *stubExecutor) Close()                          { e.closed = true }
  func (e *stubExecutor) Wait(context.Context) (int, error) {
-       defer e.created.Stdout.Close()
-       defer e.created.Stderr.Close()
        return <-e.exit, e.waitErr
  }
  
@@@ -307,11 -305,9 +305,11 @@@ func (client *KeepTestClient) LocalLoca
        return locator, nil
  }
  
 -func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
 -      client.Content = buf
 -      return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
 +func (client *KeepTestClient) BlockWrite(_ context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
 +      client.Content = opts.Data
 +      return arvados.BlockWriteResponse{
 +              Locator: fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)),
 +      }, nil
  }
  
  func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
@@@ -457,8 -453,8 +455,8 @@@ func (*KeepErrorTestClient) ManifestFil
        return nil, errors.New("KeepError")
  }
  
 -func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
 -      return "", 0, errors.New("KeepError")
 +func (*KeepErrorTestClient) BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
 +      return arvados.BlockWriteResponse{}, errors.New("KeepError")
  }
  
  func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
@@@ -524,8 -520,6 +522,6 @@@ func dockerLog(fd byte, msg string) []b
  func (s *TestSuite) TestRunContainer(c *C) {
        s.executor.runFunc = func() {
                fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
-               s.executor.created.Stdout.Close()
-               s.executor.created.Stderr.Close()
                s.executor.exit <- 0
        }