// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutB(buf []byte) (string, int, error)
+ BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
ClearBlockCache()
+ SetStorageClasses(sc []string)
}
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) (io.WriteCloser, error)
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- executor containerExecutor
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ExitCode *int
NewLogWriter NewLogWriter
CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
}
imageID := tarfiles[0][:len(tarfiles[0])-4]
- imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
+ imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
runner.CrunchLog.Printf("Using Docker image id %q", imageID)
- if !runner.executor.ImageLoaded(imageID) {
- runner.CrunchLog.Print("Loading Docker image from keep")
- err = runner.executor.LoadImage(imageFile)
- if err != nil {
- return "", err
- }
- } else {
- runner.CrunchLog.Print("Docker image is available")
+ runner.CrunchLog.Print("Loading Docker image from keep")
+ err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
+ runner.containerClient)
+ if err != nil {
+ return "", err
}
+
return imageID, nil
}
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
- c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+ c = exec.Command(cmdline[0], cmdline[1:]...)
// Copy our environment, but override ARVADOS_API_TOKEN with
// the container auth token.
return nil, err
}
runner.arvMountLog = NewThrottledLogger(w)
+ scanner := logScanner{
+ Patterns: []string{
+ "Keep write error",
+ "Block not found error",
+ "Unhandled exception during FUSE operation",
+ },
+ ReportFunc: runner.reportArvMountWarning,
+ }
c.Stdout = runner.arvMountLog
- c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
+ c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
runner.CrunchLog.Printf("Running %v", c.Args)
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{
+ "arv-mount",
"--foreground",
"--allow-other",
"--read-write",
+ "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
} else {
arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
}
+ arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
- var stdin io.ReadCloser
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
if mnt, ok := runner.Container.Mounts["stdin"]; ok {
switch mnt.Kind {
case "collection":
if !runner.enableMemoryLimit {
ram = 0
}
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
Image: imageID,
VCPUs: runner.Container.RuntimeConstraints.VCPUs,
}
runner.ExitCode = &exitcode
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+
if runner.statReporter != nil {
runner.statReporter.Stop()
err = runner.statLogger.Close()
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
- return nil
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
}
}
+func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
+ var updated arvados.Container
+ err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "container": arvadosclient.Dict{
+ "runtime_status": arvadosclient.Dict{
+ "warning": "arv-mount: " + pattern,
+ "warningDetail": text,
+ },
+ },
+ }, &updated)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
if umnterr != nil {
runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ runner.ArvMount.Process.Kill()
} else {
// If arv-mount --unmount gets stuck for any reason, we
// don't want to wait for it forever. Do Wait() in a goroutine
}
}
}
+ runner.ArvMount = nil
}
if runner.ArvMountPoint != "" {
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
+ runner.ArvMountPoint = ""
}
if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
- runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
+ runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
hostname, hosterr := os.Hostname()
if hosterr != nil {
}
checkErr("stopHoststat", runner.stopHoststat())
checkErr("CommitLogs", runner.CommitLogs())
+ runner.CleanupDirs()
checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
}()
return fmt.Errorf("error creating container API client: %v", err)
}
+ runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+ runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
if err != nil {
if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {