X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44b720099723353ac9b81370809ef71319b8a3f8..879bde382ebf26aa593869cfff22cc7e85be6bb0:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 5638e81e4d..08e4aa3899 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -60,6 +60,7 @@ type IKeepClient interface { ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) ClearBlockCache() + SetStorageClasses(sc []string) } // NewLogWriter is a factory function to create a new log writer. @@ -76,7 +77,10 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - executor containerExecutor + executor containerExecutor + executorStdin io.Closer + executorStdout io.Closer + executorStderr io.Closer // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -105,8 +109,6 @@ type ContainerRunner struct { ExitCode *int NewLogWriter NewLogWriter CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -395,6 +397,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { "--foreground", "--allow-other", "--read-write", + "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","), fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { @@ -875,7 +878,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser + var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -952,6 +955,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } + runner.executorStdin = stdin + runner.executorStdout = stdout + runner.executorStderr = stderr return runner.executor.Create(containerSpec{ Image: imageID, VCPUs: runner.Container.RuntimeConstraints.VCPUs, @@ -1016,6 +1022,27 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &exitcode + var returnErr error + if err = runner.executorStdin.Close(); err != nil { + err = fmt.Errorf("error closing container stdin: %s", err) + runner.CrunchLog.Printf("%s", err) + returnErr = err + } + if err = runner.executorStdout.Close(); err != nil { + err = fmt.Errorf("error closing container stdout: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if err = runner.executorStderr.Close(); err != nil { + err = fmt.Errorf("error closing container stderr: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if runner.statReporter != nil { runner.statReporter.Stop() err = runner.statLogger.Close() @@ -1023,7 +1050,7 @@ func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } - return nil + return returnErr } func (runner *ContainerRunner) updateLogs() { @@ -1519,6 +1546,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error { return fmt.Errorf("error creating container API client: %v", err) } + runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses) + runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses) + err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm) if err != nil { if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {