12273: Skip non-regular files when uploading outputs to Keep.
[arvados.git] / services / crunch-run / crunchrun.go
index de289dd7185d45438a8e5244a3a1aa4304585bc2..3fdd37402fada76813dc22e69251f6144da6e358 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -49,7 +53,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -341,7 +345,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        needCertMount := true
 
        var binds []string
-       for bind, _ := range runner.Container.Mounts {
+       for bind := range runner.Container.Mounts {
                binds = append(binds, bind)
        }
        sort.Strings(binds)
@@ -588,23 +592,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        logger := log.New(w, "node-info", 0)
 
        commands := []infoCommand{
-               infoCommand{
+               {
                        label: "Host Information",
                        cmd:   []string{"uname", "-a"},
                },
-               infoCommand{
+               {
                        label: "CPU Information",
                        cmd:   []string{"cat", "/proc/cpuinfo"},
                },
-               infoCommand{
+               {
                        label: "Memory Information",
                        cmd:   []string{"cat", "/proc/meminfo"},
                },
-               infoCommand{
+               {
                        label: "Disk Space",
                        cmd:   []string{"df", "-m", "/", os.TempDir()},
                },
-               infoCommand{
+               {
                        label: "Disk INodes",
                        cmd:   []string{"df", "-i", "/", os.TempDir()},
                },
@@ -646,14 +650,11 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
                return fmt.Errorf("While retrieving container record from the API server: %v", err)
        }
        defer reader.Close()
-       // Read the API server response as []byte
-       json_bytes, err := ioutil.ReadAll(reader)
-       if err != nil {
-               return fmt.Errorf("While reading container record API server response: %v", err)
-       }
-       // Decode the JSON []byte
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
        var cr map[string]interface{}
-       if err = json.Unmarshal(json_bytes, &cr); err != nil {
+       if err = dec.Decode(&cr); err != nil {
                return fmt.Errorf("While decoding the container record JSON response: %v", err)
        }
        // Re-encode it using indentation to improve readability
@@ -676,7 +677,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.CrunchLog.Print("Attaching container streams")
 
        // If stdin mount is provided, attach it to the docker container
-       var stdinRdr keepclient.Reader
+       var stdinRdr arvados.File
        var stdinJson []byte
        if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
                if stdinMnt.Kind == "collection" {
@@ -800,11 +801,13 @@ func (runner *ContainerRunner) CreateContainer() error {
        runner.ContainerConfig.Volumes = runner.Volumes
 
        runner.HostConfig = dockercontainer.HostConfig{
-               Binds:  runner.Binds,
-               Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+               Binds: runner.Binds,
                LogConfig: dockercontainer.LogConfig{
                        Type: "none",
                },
+               Resources: dockercontainer.Resources{
+                       CgroupParent: runner.setCgroupParent,
+               },
        }
 
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
@@ -854,7 +857,11 @@ func (runner *ContainerRunner) StartContainer() error {
        err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
                dockertypes.ContainerStartOptions{})
        if err != nil {
-               return fmt.Errorf("could not start container: %v", err)
+               var advice string
+               if strings.Contains(err.Error(), "no such file or directory") {
+                       advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
+               }
+               return fmt.Errorf("could not start container: %v%s", err, advice)
        }
        runner.cStarted = true
        return nil
@@ -1298,9 +1305,8 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Close()
        }()
 
-       err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+       err = runner.fetchContainerRecord()
        if err != nil {
-               err = fmt.Errorf("While getting container record: %v", err)
                return
        }
 
@@ -1363,6 +1369,24 @@ func (runner *ContainerRunner) Run() (err error) {
        return
 }
 
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       if err != nil {
+               return fmt.Errorf("error fetching container record: %v", err)
+       }
+       defer reader.Close()
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
+       err = dec.Decode(&runner.Container)
+       if err != nil {
+               return fmt.Errorf("error decoding container record: %v", err)
+       }
+       return nil
+}
+
 // NewContainerRunner creates a new container runner.
 func NewContainerRunner(api IArvadosClient,
        kc IKeepClient,