Merge branch '12764-writable-file' refs #12764
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 31 Jan 2018 02:22:38 +0000 (21:22 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 31 Jan 2018 02:22:45 +0000 (21:22 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
services/crunch-run/crunchrun.go

index 705bea486b21797ea4c6523ba28e03dda4ad269c,5c4f105f9c7037e5bdafba55df1ca6f2559b5199..0582e5418fd4776e11a97224d42c58750b90d8da
@@@ -114,12 -114,10 +114,12 @@@ type ContainerRunner struct 
        ArvMountExit   chan error
        finalState     string
  
 -      statLogger   io.WriteCloser
 -      statReporter *crunchstat.Reporter
 -      statInterval time.Duration
 -      cgroupRoot   string
 +      statLogger       io.WriteCloser
 +      statReporter     *crunchstat.Reporter
 +      hoststatLogger   io.WriteCloser
 +      hoststatReporter *crunchstat.Reporter
 +      statInterval     time.Duration
 +      cgroupRoot       string
        // What we expect the container's cgroup parent to be.
        expectCgroupParent string
        // What we tell docker to use as the container's cgroup
@@@ -332,6 -330,37 +332,37 @@@ func (runner *ContainerRunner) SetupArv
        return
  }
  
+ func copyfile(src string, dst string) (err error) {
+       srcfile, err := os.Open(src)
+       if err != nil {
+               return
+       }
+       os.MkdirAll(path.Dir(dst), 0770)
+       dstfile, err := os.Create(dst)
+       if err != nil {
+               return
+       }
+       _, err = io.Copy(dstfile, srcfile)
+       if err != nil {
+               return
+       }
+       err = srcfile.Close()
+       err2 := dstfile.Close()
+       if err != nil {
+               return
+       }
+       if err2 != nil {
+               return err2
+       }
+       return nil
+ }
  func (runner *ContainerRunner) SetupMounts() (err error) {
        err = runner.SetupArvMountPoint("keep")
        if err != nil {
        runner.Binds = nil
        runner.Volumes = make(map[string]struct{})
        needCertMount := true
+       type copyFile struct {
+               src  string
+               bind string
+       }
+       var copyFiles []copyFile
  
        var binds []string
        for bind := range runner.Container.Mounts {
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
-                               if mnt.Writable {
+                               if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+                                       copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                               } else {
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                }
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
                }
        }
  
+       for _, cp := range copyFiles {
+               dir, err := os.Stat(cp.src)
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+               if dir.IsDir() {
+                       err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               if walkerr != nil {
+                                       return walkerr
+                               }
+                               if walkinfo.Mode().IsRegular() {
+                                       return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+                               } else if walkinfo.Mode().IsDir() {
+                                       return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0770)
+                               } else {
+                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                               }
+                       })
+               } else {
+                       err = copyfile(cp.src, cp.bind)
+               }
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+       }
        return nil
  }
  
  func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        // Handle docker log protocol
        // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
 +      defer close(runner.loggingDone)
  
        header := make([]byte, 8)
 -      for {
 -              _, readerr := io.ReadAtLeast(containerReader, header, 8)
 -
 -              if readerr == nil {
 -                      readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
 -                      if header[0] == 1 {
 -                              // stdout
 -                              _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
 -                      } else {
 -                              // stderr
 -                              _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
 +      var err error
 +      for err == nil {
 +              _, err = io.ReadAtLeast(containerReader, header, 8)
 +              if err != nil {
 +                      if err == io.EOF {
 +                              err = nil
                        }
 +                      break
                }
 +              readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
 +              if header[0] == 1 {
 +                      // stdout
 +                      _, err = io.CopyN(runner.Stdout, containerReader, readsize)
 +              } else {
 +                      // stderr
 +                      _, err = io.CopyN(runner.Stderr, containerReader, readsize)
 +              }
 +      }
  
 -              if readerr != nil {
 -                      if readerr != io.EOF {
 -                              runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
 -                      }
 -
 -                      closeerr := runner.Stdout.Close()
 -                      if closeerr != nil {
 -                              runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
 -                      }
 +      if err != nil {
 +              runner.CrunchLog.Printf("error reading docker logs: %v", err)
 +      }
  
 -                      closeerr = runner.Stderr.Close()
 -                      if closeerr != nil {
 -                              runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
 -                      }
 +      err = runner.Stdout.Close()
 +      if err != nil {
 +              runner.CrunchLog.Printf("error closing stdout logs: %v", err)
 +      }
  
 -                      if runner.statReporter != nil {
 -                              runner.statReporter.Stop()
 -                              closeerr = runner.statLogger.Close()
 -                              if closeerr != nil {
 -                                      runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
 -                              }
 -                      }
 +      err = runner.Stderr.Close()
 +      if err != nil {
 +              runner.CrunchLog.Printf("error closing stderr logs: %v", err)
 +      }
  
 -                      close(runner.loggingDone)
 -                      return
 +      if runner.statReporter != nil {
 +              runner.statReporter.Stop()
 +              err = runner.statLogger.Close()
 +              if err != nil {
 +                      runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
  }
  
 -func (runner *ContainerRunner) StartCrunchstat() {
 +func (runner *ContainerRunner) stopHoststat() error {
 +      if runner.hoststatReporter == nil {
 +              return nil
 +      }
 +      runner.hoststatReporter.Stop()
 +      err := runner.hoststatLogger.Close()
 +      if err != nil {
 +              return fmt.Errorf("error closing hoststat logs: %v", err)
 +      }
 +      return nil
 +}
 +
 +func (runner *ContainerRunner) startHoststat() {
 +      runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
 +      runner.hoststatReporter = &crunchstat.Reporter{
 +              Logger:     log.New(runner.hoststatLogger, "", 0),
 +              CgroupRoot: runner.cgroupRoot,
 +              PollPeriod: runner.statInterval,
 +      }
 +      runner.hoststatReporter.Start()
 +}
 +
 +func (runner *ContainerRunner) startCrunchstat() {
        runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
@@@ -1102,7 -1142,7 +1166,7 @@@ func (runner *ContainerRunner) UploadOu
        // go through mounts and try reverse map to collection reference
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
                        // get path relative to bind
                        targetSuffix := tgt[len(bind):]
  
@@@ -1241,7 -1281,7 +1305,7 @@@ func (runner *ContainerRunner) CaptureO
                        continue
                }
  
-               if mnt.ExcludeFromOutput == true {
+               if mnt.ExcludeFromOutput == true || mnt.Writable {
                        continue
                }
  
@@@ -1536,7 -1576,6 +1600,7 @@@ func (runner *ContainerRunner) Run() (e
                }
  
                checkErr(runner.CaptureOutput())
 +              checkErr(runner.stopHoststat())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
        }()
        if err != nil {
                return
        }
 -
 -      // setup signal handling
        runner.setupSignals()
 +      runner.startHoststat()
  
        // check for and/or load image
        err = runner.LoadImage()
        }
        runner.finalState = "Cancelled"
  
 -      runner.StartCrunchstat()
 +      runner.startCrunchstat()
  
        err = runner.StartContainer()
        if err != nil {