Merge branch '12934-cwl-conformance' refs #12934
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 17 Jan 2018 04:32:04 +0000 (23:32 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 17 Jan 2018 04:32:14 +0000 (23:32 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
services/crunch-run/crunchrun.go

index 6a118a67435e5166b5e27f62fe02ebffe237fdb3,bb5973d874533bb6d7d27dcafc30652b9b956d87..b480c068c597ecc178b08be7278af2edbd72bce9
@@@ -660,12 -660,9 +660,12 @@@ type infoCommand struct 
        cmd   []string
  }
  
 -// LogNodeInfo gathers node information and store it on the log for debugging
 -// purposes.
 -func (runner *ContainerRunner) LogNodeInfo() (err error) {
 +// LogHostInfo logs info about the current host, for debugging and
 +// accounting purposes. Although it's logged as "node-info", this is
 +// about the environment where crunch-run is actually running, which
 +// might differ from what's described in the node record (see
 +// LogNodeRecord).
 +func (runner *ContainerRunner) LogHostInfo() (err error) {
        w := runner.NewLogWriter("node-info")
  
        commands := []infoCommand{
  }
  
  // LogContainerRecord gets and saves the raw JSON container record from the API server
 -func (runner *ContainerRunner) LogContainerRecord() (err error) {
 +func (runner *ContainerRunner) LogContainerRecord() error {
 +      logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
 +      if !logged && err == nil {
 +              err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
 +      }
 +      return err
 +}
 +
 +// LogNodeRecord logs arvados#node record corresponding to the current host.
 +func (runner *ContainerRunner) LogNodeRecord() error {
 +      hostname := os.Getenv("SLURMD_NODENAME")
 +      if hostname == "" {
 +              hostname, _ = os.Hostname()
 +      }
 +      _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
 +              // The "info" field has admin-only info when obtained
 +              // with a privileged token, and should not be logged.
 +              node, ok := resp.(map[string]interface{})
 +              if ok {
 +                      delete(node, "info")
 +              }
 +      })
 +      return err
 +}
 +
 +func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
                UUID:          runner.Container.UUID,
 -              loggingStream: "container",
 -              writeCloser:   runner.LogCollection.Open("container.json"),
 +              loggingStream: label,
 +              writeCloser:   runner.LogCollection.Open(label + ".json"),
        }
  
 -      // Get Container record JSON from the API Server
 -      reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
 +      reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
 -              return fmt.Errorf("While retrieving container record from the API server: %v", err)
 +              return false, fmt.Errorf("error getting %s record: %v", label, err)
        }
        defer reader.Close()
  
        dec := json.NewDecoder(reader)
        dec.UseNumber()
 -      var cr map[string]interface{}
 -      if err = dec.Decode(&cr); err != nil {
 -              return fmt.Errorf("While decoding the container record JSON response: %v", err)
 +      var resp map[string]interface{}
 +      if err = dec.Decode(&resp); err != nil {
 +              return false, fmt.Errorf("error decoding %s list response: %v", label, err)
 +      }
 +      items, ok := resp["items"].([]interface{})
 +      if !ok {
 +              return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
 +      } else if len(items) < 1 {
 +              return false, nil
 +      }
 +      if munge != nil {
 +              munge(items[0])
        }
        // Re-encode it using indentation to improve readability
        enc := json.NewEncoder(w)
        enc.SetIndent("", "    ")
 -      if err = enc.Encode(cr); err != nil {
 -              return fmt.Errorf("While logging the JSON container record: %v", err)
 +      if err = enc.Encode(items[0]); err != nil {
 +              return false, fmt.Errorf("error logging %s record: %v", label, err)
        }
        err = w.Close()
        if err != nil {
 -              return fmt.Errorf("While closing container.json log: %v", err)
 +              return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
        }
 -      return nil
 +      return true, nil
  }
  
  // AttachStreams connects the docker container stdin, stdout and stderr logs
@@@ -1086,14 -1050,34 +1086,34 @@@ func (runner *ContainerRunner) UploadOu
        relocateTo string,
        followed int) (manifestText string, err error) {
  
-       if info.Mode().IsDir() {
-               return
-       }
        if infoerr != nil {
                return "", infoerr
        }
  
+       if info.Mode().IsDir() {
+               // if empty, need to create a .keep file
+               dir, direrr := os.Open(path)
+               if direrr != nil {
+                       return "", direrr
+               }
+               defer dir.Close()
+               names, eof := dir.Readdirnames(1)
+               if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
+                       containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
+                       for _, bind := range binds {
+                               mnt := runner.Container.Mounts[bind]
+                               // Check if there is a bind for this
+                               // directory, in which case assume we don't need .keep
+                               if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+                                       return
+                               }
+                       }
+                       outputSuffix := path[len(runner.HostOutputDir)+1:]
+                       return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
+               }
+               return
+       }
        if followed >= limitFollowSymlinks {
                // Got stuck in a loop or just a pathological number of
                // directory links, give up.
                return
        }
  
-       // When following symlinks, the source path may need to be logically
-       // relocated to some other path within the output collection.  Remove
-       // the relocateFrom prefix and replace it with relocateTo.
+       // "path" is the actual path we are visiting
+       // "tgt" is the target of "path" (a non-symlink) after following symlinks
+       // "relocated" is the path in the output manifest where the file should be placed,
+       // but has HostOutputDir as a prefix.
+       // The destination path in the output manifest may need to be
+       // logically relocated to some other path in order to appear
+       // in the correct location as a result of following a symlink.
+       // Remove the relocateFrom prefix and replace it with
+       // relocateTo.
        relocated := relocateTo + path[len(relocateFrom):]
  
        tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
  
                        // Terminates in this keep mount, so add the
                        // manifest text at appropriate location.
-                       outputSuffix := path[len(runner.HostOutputDir):]
+                       outputSuffix := relocated[len(runner.HostOutputDir):]
                        manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
                        return
                }
@@@ -1588,14 -1579,13 +1615,14 @@@ func (runner *ContainerRunner) Run() (e
        if err != nil {
                return
        }
 -
 -      // Gather and record node information
 -      err = runner.LogNodeInfo()
 +      err = runner.LogHostInfo()
 +      if err != nil {
 +              return
 +      }
 +      err = runner.LogNodeRecord()
        if err != nil {
                return
        }
 -      // Save container.json record on log collection
        err = runner.LogContainerRecord()
        if err != nil {
                return