17816: Make singularity-in-arvbox work
[arvados.git] / lib / crunchrun / crunchrun.go
index 4bdaca4d64b7c74d36fab5b17443b3f61bdb334e..08e4aa3899ec46ca68d86d5ef1ea930057830c00 100644 (file)
@@ -60,6 +60,7 @@ type IKeepClient interface {
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
        ClearBlockCache()
+       SetStorageClasses(sc []string)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -76,7 +77,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -105,8 +109,6 @@ type ContainerRunner struct {
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -148,9 +150,10 @@ type ContainerRunner struct {
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
 
-       enableNetwork string // one of "default" or "always"
-       networkMode   string // "none", "host", or "" -- passed through to executor
-       arvMountLog   *ThrottledLogger
+       enableMemoryLimit bool
+       enableNetwork     string // one of "default" or "always"
+       networkMode       string // "none", "host", or "" -- passed through to executor
+       arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
 
@@ -291,7 +294,7 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }
        runner.arvMountLog = NewThrottledLogger(w)
        c.Stdout = runner.arvMountLog
-       c.Stderr = runner.arvMountLog
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -386,6 +389,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("could not get container token: %s", err)
        }
+       runner.CrunchLog.Printf("container token %q", token)
 
        pdhOnly := true
        tmpcount := 0
@@ -393,6 +397,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                "--foreground",
                "--allow-other",
                "--read-write",
+               "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
@@ -873,7 +878,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
@@ -946,11 +951,17 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                // both "" and "." mean default
                workdir = ""
        }
-
+       ram := runner.Container.RuntimeConstraints.RAM
+       if !runner.enableMemoryLimit {
+               ram = 0
+       }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
        return runner.executor.Create(containerSpec{
                Image:         imageID,
                VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
-               RAM:           runner.Container.RuntimeConstraints.RAM,
+               RAM:           ram,
                WorkingDir:    workdir,
                Env:           env,
                BindMounts:    bindmounts,
@@ -1011,6 +1022,27 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
        if runner.statReporter != nil {
                runner.statReporter.Stop()
                err = runner.statLogger.Close()
@@ -1018,7 +1050,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1514,6 +1546,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
+       runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+       runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
        err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
@@ -1586,6 +1621,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+       enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
        enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
        networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
@@ -1689,9 +1725,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        if gwAuthSecret == "" {
                // not safe to run a gateway service without an auth
                // secret
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
        } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
                // dispatcher did not tell us which external IP
                // address to advertise --> no gateway service
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
        } else if de, ok := cr.executor.(*dockerExecutor); ok {
                cr.gateway = Gateway{
                        Address:            gwListen,
@@ -1718,6 +1756,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
+       cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {