+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"os/signal"
"path"
"path/filepath"
+ "runtime"
+ "runtime/pprof"
"sort"
"strings"
"sync"
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+ ClearBlockCache()
}
// NewLogWriter is a factory function to create a new log writer.
networkMode string // passed through to HostConfig.NetworkMode
}
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
// Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
signal.Notify(runner.SigChan, syscall.SIGINT)
go func(sig chan os.Signal) {
<-sig
runner.stop()
- signal.Stop(sig)
}(runner.SigChan)
}
}
}
+func (runner *ContainerRunner) teardown() {
+ if runner.SigChan != nil {
+ signal.Stop(runner.SigChan)
+ close(runner.SigChan)
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
- response.Body.Close()
+
+ defer response.Body.Close()
+ rbody, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("Reading response to image load: %v", err)
+ }
+ runner.CrunchLog.Printf("Docker response: %s", rbody)
} else {
runner.CrunchLog.Print("Docker image is available")
}
runner.ContainerConfig.Image = imageID
+ runner.Kc.ClearBlockCache()
+
return nil
}
needCertMount := true
var binds []string
- for bind, _ := range runner.Container.Mounts {
+ for bind := range runner.Container.Mounts {
binds = append(binds, bind)
}
sort.Strings(binds)
logger := log.New(w, "node-info", 0)
commands := []infoCommand{
- infoCommand{
+ {
label: "Host Information",
cmd: []string{"uname", "-a"},
},
- infoCommand{
+ {
label: "CPU Information",
cmd: []string{"cat", "/proc/cpuinfo"},
},
- infoCommand{
+ {
label: "Memory Information",
cmd: []string{"cat", "/proc/meminfo"},
},
- infoCommand{
+ {
label: "Disk Space",
cmd: []string{"df", "-m", "/", os.TempDir()},
},
- infoCommand{
+ {
label: "Disk INodes",
cmd: []string{"df", "-i", "/", os.TempDir()},
},
return fmt.Errorf("While retrieving container record from the API server: %v", err)
}
defer reader.Close()
- // Read the API server response as []byte
- json_bytes, err := ioutil.ReadAll(reader)
- if err != nil {
- return fmt.Errorf("While reading container record API server response: %v", err)
- }
- // Decode the JSON []byte
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
var cr map[string]interface{}
- if err = json.Unmarshal(json_bytes, &cr); err != nil {
+ if err = dec.Decode(&cr); err != nil {
return fmt.Errorf("While decoding the container record JSON response: %v", err)
}
// Re-encode it using indentation to improve readability
err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
dockertypes.ContainerStartOptions{})
if err != nil {
- return fmt.Errorf("could not start container: %v", err)
+ var advice string
+ if strings.Contains(err.Error(), "no such file or directory") {
+ advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
+ }
+ return fmt.Errorf("could not start container: %v%s", err, advice)
}
runner.cStarted = true
return nil
return nil
}
+var NotInOutputDirError = fmt.Errorf("Must point to path within the output directory")
+
+func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
+ // Follow symlinks if necessary
+ info = startinfo
+ tgt = path
+ readlinktgt = ""
+ nextlink := path
+ for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
+ if followed >= 16 {
+ // Got stuck in a loop or just a pathological number of links, give up.
+ err = fmt.Errorf("Followed too many symlinks from path %q", path)
+ return
+ }
+
+ readlinktgt, err = os.Readlink(nextlink)
+ if err != nil {
+ return
+ }
+
+ tgt = readlinktgt
+ if !strings.HasPrefix(tgt, "/") {
+ // Relative symlink, resolve it to host path
+ tgt = filepath.Join(filepath.Dir(path), tgt)
+ }
+ if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ // Absolute symlink to container output path, adjust it to host output path.
+ tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
+ }
+ if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ // After dereferencing, symlink target must either be
+ // within output directory, or must point to a
+ // collection mount.
+ err = NotInOutputDirError
+ return
+ }
+
+ info, err = os.Lstat(tgt)
+ if err != nil {
+ // tgt
+ err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
+ path[len(runner.HostOutputDir):], readlinktgt, err)
+ return
+ }
+
+ nextlink = tgt
+ }
+
+ return
+}
+
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText. Ensure that whether
+// symlinks are relative or absolute, they must remain within the output
+// directory.
+//
+// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
+func (runner *ContainerRunner) UploadOutputFile(
+ path string,
+ info os.FileInfo,
+ infoerr error,
+ binds []string,
+ walkUpload *WalkUpload,
+ relocateFrom string,
+ relocateTo string,
+ followed int) (manifestText string, err error) {
+
+ if info.Mode().IsDir() {
+ return
+ }
+
+ if infoerr != nil {
+ return "", infoerr
+ }
+
+ if followed >= 8 {
+ // Got stuck in a loop or just a pathological number of
+ // directory links, give up.
+ err = fmt.Errorf("Followed too many symlinks from path %q", path)
+ return
+ }
+
+ // When following symlinks, the source path may need to be logically
+ // relocated to some other path within the output collection. Remove
+ // the relocateFrom prefix and replace it with relocateTo.
+ relocated := relocateTo + path[len(relocateFrom):]
+
+ tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
+ if derefErr != nil && derefErr != NotInOutputDirError {
+ return "", derefErr
+ }
+
+ // go through mounts and try reverse map to collection reference
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ // get path relative to bind
+ targetSuffix := tgt[len(bind):]
+
+ // Copy mount and adjust the path to add path relative to the bind
+ adjustedMount := mnt
+ adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+ // Terminates in this keep mount, so add the
+ // manifest text at appropriate location.
+ outputSuffix := path[len(runner.HostOutputDir):]
+ manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+ return
+ }
+ }
+
+ // If target is not a collection mount, it must be located within the
+ // output directory, otherwise it is an error.
+ if derefErr == NotInOutputDirError {
+ err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
+ path[len(runner.HostOutputDir):], readlinktgt)
+ return
+ }
+
+ if info.Mode().IsRegular() {
+ return "", walkUpload.UploadFile(relocated, tgt)
+ }
+
+ if info.Mode().IsDir() {
+ // Symlink leads to directory. Walk() doesn't follow
+ // directory symlinks, so we walk the target directory
+ // instead. Within the walk, file paths are relocated
+ // so they appear under the original symlink path.
+ err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ var m string
+ m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
+ binds, walkUpload, tgt, relocated, followed+1)
+ if walkerr == nil {
+ manifestText = manifestText + m
+ }
+ return walkerr
+ })
+ return
+ }
+
+ return
+}
+
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
if runner.finalState != "Complete" {
if err != nil {
// Regular directory
- // Find symlinks to arv-mounted files & dirs.
- err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if info.Mode()&os.ModeSymlink == 0 {
- return nil
- }
- // read link to get container internal path
- // only support 1 level of symlinking here.
- var tgt string
- tgt, err = os.Readlink(path)
- if err != nil {
- return err
- }
-
- // get path relative to output dir
- outputSuffix := path[len(runner.HostOutputDir):]
+ cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
+ walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
- if strings.HasPrefix(tgt, "/") {
- // go through mounts and try reverse map to collection reference
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
- if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
- // get path relative to bind
- targetSuffix := tgt[len(bind):]
-
- // Copy mount and adjust the path to add path relative to the bind
- adjustedMount := mnt
- adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
-
- // get manifest text
- var m string
- m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
- if err != nil {
- return err
- }
- manifestText = manifestText + m
- // delete symlink so WriteTree won't try to to dereference it.
- os.Remove(path)
- return nil
- }
- }
+ var m string
+ err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+ m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
+ if err == nil {
+ manifestText = manifestText + m
}
+ return err
+ })
- // Not a link to a mount. Must be dereferencible and
- // point into the output directory.
- tgt, err = filepath.EvalSymlinks(path)
- if err != nil {
- os.Remove(path)
- return err
- }
+ cw.EndUpload(walkUpload)
- // Symlink target must be within the output directory otherwise it's an error.
- if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
- os.Remove(path)
- return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
- outputSuffix, tgt)
- }
- return nil
- })
if err != nil {
- return fmt.Errorf("While checking output symlinks: %v", err)
+ return fmt.Errorf("While uploading output files: %v", err)
}
- cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- var m string
- m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ m, err = cw.ManifestText()
manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
// a new one in case we needed to log anything while
// finalizing.
runner.CrunchLog.Close()
+
+ runner.teardown()
}()
- err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+ err = runner.fetchContainerRecord()
if err != nil {
- err = fmt.Errorf("While getting container record: %v", err)
return
}
// setup signal handling
- runner.SetupSignals()
+ runner.setupSignals()
// check for and/or load image
err = runner.LoadImage()
return
}
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+ reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ if err != nil {
+ return fmt.Errorf("error fetching container record: %v", err)
+ }
+ defer reader.Close()
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
+ err = dec.Decode(&runner.Container)
+ if err != nil {
+ return fmt.Errorf("error decoding container record: %v", err)
+ }
+ return nil
+}
+
// NewContainerRunner creates a new container runner.
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,
networkMode := flag.String("container-network-mode", "default",
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
+ memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
flag.Parse()
containerId := flag.Arg(0)
if err != nil {
log.Fatalf("%s: %v", containerId, err)
}
+ kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
var docker *dockerclient.Client
cr.expectCgroupParent = p
}
- err = cr.Run()
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ runerr := cr.Run()
+
+ if *memprofile != "" {
+ f, err := os.Create(*memprofile)
+ if err != nil {
+ log.Printf("could not create memory profile: ", err)
+ }
+ runtime.GC() // get up-to-date statistics
+ if err := pprof.WriteHeapProfile(f); err != nil {
+ log.Printf("could not write memory profile: ", err)
+ }
+ closeerr := f.Close()
+ if closeerr != nil {
+ log.Printf("closing memprofile file: ", err)
+ }
}
+ if runerr != nil {
+ log.Fatalf("%s: %v", containerId, runerr)
+ }
}