import (
"bytes"
+ "context"
"encoding/json"
"errors"
"flag"
"io"
"io/ioutil"
"log"
+ "net"
+ "net/http"
"os"
"os/exec"
"os/signal"
+ "os/user"
"path"
"path/filepath"
"regexp"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/crypto/ssh"
- "golang.org/x/net/context"
-
- dockertypes "github.com/docker/docker/api/types"
- dockercontainer "github.com/docker/docker/api/types/container"
- dockernetwork "github.com/docker/docker/api/types/network"
- dockerclient "github.com/docker/docker/client"
+ "golang.org/x/sys/unix"
)
type command struct{}
var Command = command{}
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+ Env map[string]string
+ KeepBuffers int
+ Cluster *arvados.Cluster
+}
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutB(buf []byte) (string, int, error)
+ BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
ClearBlockCache()
+ SetStorageClasses(sc []string)
}
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) (io.WriteCloser, error)
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
-// ThinDockerClient is the minimal Docker client interface used by crunch-run.
-type ThinDockerClient interface {
- ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
- ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
- networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
- ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
- ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
- ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
- ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
- ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
- ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
- ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
-}
-
type PsProcess interface {
CmdlineSlice() ([]string, error)
}
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ContainerArvClient IArvadosClient
ContainerKeepClient IKeepClient
- Container arvados.Container
- ContainerConfig dockercontainer.Config
- HostConfig dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- logUUID string
- logMtx sync.Mutex
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount RunArvMount
- MkTempDir MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
- finalState string
- parentTemp string
-
+ Container arvados.Container
+ token string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ CrunchLog *ThrottledLogger
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+ finalState string
+ parentTemp string
+
+ keepstoreLogger io.WriteCloser
+ keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
- cRemoved bool // docker confirmed the container no longer exists
- enableNetwork string // one of "default" or "always"
- networkMode string // passed through to HostConfig.NetworkMode
- arvMountLog *ThrottledLogger
+ enableMemoryLimit bool
+ enableNetwork string // one of "default" or "always"
+ networkMode string // "none", "host", or "" -- passed through to executor
+ arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
- gatewayAddress string
- gatewaySSHConfig *ssh.ServerConfig
- gatewayAuthSecret string
+ gateway Gateway
}
-// setupSignals sets up signal handling to gracefully terminate the underlying
-// Docker container and update state when receiving a TERM, INT or QUIT signal.
+// setupSignals sets up signal handling to gracefully terminate the
+// underlying container and update state when receiving a TERM, INT or
+// QUIT signal.
func (runner *ContainerRunner) setupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
}(runner.SigChan)
}
-// stop the underlying Docker container.
+// stop the underlying container.
func (runner *ContainerRunner) stop(sig os.Signal) {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
if sig != nil {
runner.CrunchLog.Printf("caught signal: %v", sig)
}
- if runner.ContainerID == "" {
- return
- }
runner.cCancelled = true
- runner.CrunchLog.Printf("removing container")
- err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ runner.CrunchLog.Printf("stopping container")
+ err := runner.executor.Stop()
if err != nil {
- runner.CrunchLog.Printf("error removing container: %s", err)
- }
- if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
- runner.cRemoved = true
+ runner.CrunchLog.Printf("error stopping container: %s", err)
}
}
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
-func (runner *ContainerRunner) LoadImage() (err error) {
-
+func (runner *ContainerRunner) LoadImage() (string, error) {
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
- var collection arvados.Collection
- err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+ d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage)
if err != nil {
- return fmt.Errorf("While getting container image collection: %v", err)
+ return "", err
}
- manifest := manifest.Manifest{Text: collection.ManifestText}
- var img, imageID string
- for ms := range manifest.StreamIter() {
- img = ms.FileStreamSegments[0].Name
- if !strings.HasSuffix(img, ".tar") {
- return fmt.Errorf("First file in the container image collection does not end in .tar")
+ defer d.Close()
+ allfiles, err := d.Readdirnames(-1)
+ if err != nil {
+ return "", err
+ }
+ var tarfiles []string
+ for _, fnm := range allfiles {
+ if strings.HasSuffix(fnm, ".tar") {
+ tarfiles = append(tarfiles, fnm)
}
- imageID = img[:len(img)-4]
}
+ if len(tarfiles) == 0 {
+ return "", fmt.Errorf("image collection does not include a .tar image file")
+ }
+ if len(tarfiles) > 1 {
+ return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
+ }
+ imageID := tarfiles[0][:len(tarfiles[0])-4]
+ imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
+ runner.CrunchLog.Printf("Using Docker image id %q", imageID)
- runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
-
- _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+ runner.CrunchLog.Print("Loading Docker image from keep")
+ err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
+ runner.containerClient)
if err != nil {
- runner.CrunchLog.Print("Loading Docker image from keep")
-
- var readCloser io.ReadCloser
- readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
- if err != nil {
- return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
- }
-
- response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
- if err != nil {
- return fmt.Errorf("While loading container image into Docker: %v", err)
- }
-
- defer response.Body.Close()
- rbody, err := ioutil.ReadAll(response.Body)
- if err != nil {
- return fmt.Errorf("Reading response to image load: %v", err)
- }
- runner.CrunchLog.Printf("Docker response: %s", rbody)
- } else {
- runner.CrunchLog.Print("Docker image is available")
+ return "", err
}
- runner.ContainerConfig.Image = imageID
-
- runner.ContainerKeepClient.ClearBlockCache()
-
- return nil
+ return imageID, nil
}
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
- c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+ c = exec.Command(cmdline[0], cmdline[1:]...)
// Copy our environment, but override ARVADOS_API_TOKEN with
// the container auth token.
return nil, err
}
runner.arvMountLog = NewThrottledLogger(w)
+ scanner := logScanner{
+ Patterns: []string{
+ "Keep write error",
+ "Block not found error",
+ "Unhandled exception during FUSE operation",
+ },
+ ReportFunc: runner.reportArvMountWarning,
+ }
c.Stdout = runner.arvMountLog
- c.Stderr = runner.arvMountLog
+ c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
runner.CrunchLog.Printf("Running %v", c.Args)
return nil
}
-func (runner *ContainerRunner) SetupMounts() (err error) {
- err = runner.SetupArvMountPoint("keep")
+func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
+ bindmounts := map[string]bindmount{}
+ err := runner.SetupArvMountPoint("keep")
if err != nil {
- return fmt.Errorf("While creating keep mount temp dir: %v", err)
+ return nil, fmt.Errorf("While creating keep mount temp dir: %v", err)
}
token, err := runner.ContainerToken()
if err != nil {
- return fmt.Errorf("could not get container token: %s", err)
+ return nil, fmt.Errorf("could not get container token: %s", err)
}
+ runner.CrunchLog.Printf("container token %q", token)
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{
+ "arv-mount",
"--foreground",
- "--allow-other",
"--read-write",
+ "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
+ if runner.executor.Runtime() == "docker" {
+ arvMountCmd = append(arvMountCmd, "--allow-other")
+ }
+
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
}
collectionPaths := []string{}
- runner.Binds = nil
- runner.Volumes = make(map[string]struct{})
needCertMount := true
type copyFile struct {
src string
}
for bind := range runner.SecretMounts {
if _, ok := runner.Container.Mounts[bind]; ok {
- return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
+ return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind)
}
if runner.SecretMounts[bind].Kind != "json" &&
runner.SecretMounts[bind].Kind != "text" {
- return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
+ return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
bind, runner.SecretMounts[bind].Kind)
}
binds = append(binds, bind)
sort.Strings(binds)
for _, bind := range binds {
- mnt, ok := runner.Container.Mounts[bind]
- if !ok {
+ mnt, notSecret := runner.Container.Mounts[bind]
+ if !notSecret {
mnt = runner.SecretMounts[bind]
}
if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
+ return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+ return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
}
}
if bind == "stdin" {
// Is it a "collection" mount kind?
if mnt.Kind != "collection" && mnt.Kind != "json" {
- return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
+ return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
}
}
if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
- return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
+ return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
}
}
case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
- return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+ return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
}
if mnt.UUID != "" {
if mnt.Writable {
- return fmt.Errorf("writing to existing collections currently not permitted")
+ return nil, fmt.Errorf("writing to existing collections currently not permitted")
}
pdhOnly = false
src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
} else if mnt.PortableDataHash != "" {
if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- return fmt.Errorf("can never write to a collection specified by portable data hash")
+ return nil, fmt.Errorf("can never write to a collection specified by portable data hash")
}
idx := strings.Index(mnt.PortableDataHash, "/")
if idx > 0 {
}
} else {
src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
- arvMountCmd = append(arvMountCmd, "--mount-tmp")
- arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+ arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
tmpcount++
}
if mnt.Writable {
if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+ bindmounts[bind] = bindmount{HostPath: src}
} else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
} else {
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+ bindmounts[bind] = bindmount{HostPath: src}
}
} else {
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+ bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true}
}
collectionPaths = append(collectionPaths, src)
var tmpdir string
tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
if err != nil {
- return fmt.Errorf("while creating mount temp dir: %v", err)
+ return nil, fmt.Errorf("while creating mount temp dir: %v", err)
}
st, staterr := os.Stat(tmpdir)
if staterr != nil {
- return fmt.Errorf("while Stat on temp dir: %v", staterr)
+ return nil, fmt.Errorf("while Stat on temp dir: %v", staterr)
}
err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
- return fmt.Errorf("while Chmod temp dir: %v", err)
+ return nil, fmt.Errorf("while Chmod temp dir: %v", err)
}
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+ bindmounts[bind] = bindmount{HostPath: tmpdir}
if bind == runner.Container.OutputPath {
runner.HostOutputDir = tmpdir
}
if mnt.Kind == "json" {
filedata, err = json.Marshal(mnt.Content)
if err != nil {
- return fmt.Errorf("encoding json data: %v", err)
+ return nil, fmt.Errorf("encoding json data: %v", err)
}
} else {
text, ok := mnt.Content.(string)
if !ok {
- return fmt.Errorf("content for mount %q must be a string", bind)
+ return nil, fmt.Errorf("content for mount %q must be a string", bind)
}
filedata = []byte(text)
}
tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
if err != nil {
- return fmt.Errorf("creating temp dir: %v", err)
+ return nil, fmt.Errorf("creating temp dir: %v", err)
}
tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
err = ioutil.WriteFile(tmpfn, filedata, 0444)
if err != nil {
- return fmt.Errorf("writing temp file: %v", err)
+ return nil, fmt.Errorf("writing temp file: %v", err)
}
- if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+ if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+ // In most cases, if the container
+ // specifies a literal file inside the
+ // output path, we copy it into the
+ // output directory (either a mounted
+ // collection or a staging area on the
+ // host fs). If it's a secret, it will
+ // be skipped when copying output from
+ // staging to Keep later.
copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
} else {
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+ // If a secret is outside OutputPath,
+ // we bind mount the secret file
+ // directly just like other mounts. We
+ // also use this strategy when a
+ // secret is inside OutputPath but
+ // OutputPath is a live collection, to
+ // avoid writing the secret to
+ // Keep. Attempting to remove a
+ // bind-mounted secret file from
+ // inside the container will return a
+ // "Device or resource busy" error
+ // that might not be handled well by
+ // the container, which is why we
+ // don't use this strategy when
+ // OutputPath is a staging directory.
+ bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
}
case mnt.Kind == "git_tree":
tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
if err != nil {
- return fmt.Errorf("creating temp dir: %v", err)
+ return nil, fmt.Errorf("creating temp dir: %v", err)
}
err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
if err != nil {
- return err
+ return nil, err
}
- runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
+ bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
}
}
if runner.HostOutputDir == "" {
- return fmt.Errorf("output path does not correspond to a writable mount point")
+ return nil, fmt.Errorf("output path does not correspond to a writable mount point")
}
- if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
+ if needCertMount && runner.Container.RuntimeConstraints.API {
for _, certfile := range arvadosclient.CertFiles {
_, err := os.Stat(certfile)
if err == nil {
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
+ bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
break
}
}
}
if pdhOnly {
- arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+ // If we are only mounting collections by pdh, make
+ // sure we don't subscribe to websocket events to
+ // avoid putting undesired load on the API server
+ arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
} else {
arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
}
+ // the by_uuid mount point is used by singularity when writing
+ // out docker images converted to SIF
+ arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
if err != nil {
- return fmt.Errorf("while trying to start arv-mount: %v", err)
+ return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
for _, p := range collectionPaths {
_, err = os.Stat(p)
if err != nil {
- return fmt.Errorf("while checking that input files exist: %v", err)
+ return nil, fmt.Errorf("while checking that input files exist: %v", err)
}
}
for _, cp := range copyFiles {
st, err := os.Stat(cp.src)
if err != nil {
- return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
}
if st.IsDir() {
err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
}
}
if err != nil {
- return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
- }
- }
-
- return nil
-}
-
-func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
- // Handle docker log protocol
- // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
- defer close(runner.loggingDone)
-
- header := make([]byte, 8)
- var err error
- for err == nil {
- _, err = io.ReadAtLeast(containerReader, header, 8)
- if err != nil {
- if err == io.EOF {
- err = nil
- }
- break
- }
- readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
- if header[0] == 1 {
- // stdout
- _, err = io.CopyN(runner.Stdout, containerReader, readsize)
- } else {
- // stderr
- _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+ return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
}
}
- if err != nil {
- runner.CrunchLog.Printf("error reading docker logs: %v", err)
- }
-
- err = runner.Stdout.Close()
- if err != nil {
- runner.CrunchLog.Printf("error closing stdout logs: %v", err)
- }
-
- err = runner.Stderr.Close()
- if err != nil {
- runner.CrunchLog.Printf("error closing stderr logs: %v", err)
- }
-
- if runner.statReporter != nil {
- runner.statReporter.Stop()
- err = runner.statLogger.Close()
- if err != nil {
- runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
- }
- }
+ return bindmounts, nil
}
func (runner *ContainerRunner) stopHoststat() error {
}
runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
- CID: runner.ContainerID,
+ CID: runner.executor.CgroupID(),
Logger: log.New(runner.statLogger, "", 0),
CgroupParent: runner.expectCgroupParent,
CgroupRoot: runner.cgroupRoot,
return true, nil
}
-// AttachStreams connects the docker container stdin, stdout and stderr logs
-// to the Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachStreams() (err error) {
-
- runner.CrunchLog.Print("Attaching container streams")
-
- // If stdin mount is provided, attach it to the docker container
- var stdinRdr arvados.File
- var stdinJSON []byte
- if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
- if stdinMnt.Kind == "collection" {
- var stdinColl arvados.Collection
- collID := stdinMnt.UUID
- if collID == "" {
- collID = stdinMnt.PortableDataHash
- }
- err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
- if err != nil {
- return fmt.Errorf("While getting stdin collection: %v", err)
- }
-
- stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
- manifest.Manifest{Text: stdinColl.ManifestText},
- stdinMnt.Path)
- if os.IsNotExist(err) {
- return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
- } else if err != nil {
- return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
- }
- } else if stdinMnt.Kind == "json" {
- stdinJSON, err = json.Marshal(stdinMnt.Content)
- if err != nil {
- return fmt.Errorf("While encoding stdin json data: %v", err)
- }
- }
- }
-
- stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
- response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
- if err != nil {
- return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
- }
-
- runner.loggingDone = make(chan bool)
-
- if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
- if err != nil {
- return err
- }
- runner.Stdout = stdoutFile
- } else if w, err := runner.NewLogWriter("stdout"); err != nil {
- return err
- } else {
- runner.Stdout = NewThrottledLogger(w)
- }
-
- if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
- stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
- if err != nil {
- return err
- }
- runner.Stderr = stderrFile
- } else if w, err := runner.NewLogWriter("stderr"); err != nil {
- return err
- } else {
- runner.Stderr = NewThrottledLogger(w)
- }
-
- if stdinRdr != nil {
- go func() {
- _, err := io.Copy(response.Conn, stdinRdr)
- if err != nil {
- runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
- runner.stop(nil)
- }
- stdinRdr.Close()
- response.CloseWrite()
- }()
- } else if len(stdinJSON) != 0 {
- go func() {
- _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
- if err != nil {
- runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
- runner.stop(nil)
- }
- response.CloseWrite()
- }()
- }
-
- go runner.ProcessDockerAttach(response.Reader)
-
- return nil
-}
-
func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
stdoutPath := mntPath[len(runner.Container.OutputPath):]
index := strings.LastIndex(stdoutPath, "/")
}
// CreateContainer creates the docker container.
-func (runner *ContainerRunner) CreateContainer() error {
- runner.CrunchLog.Print("Creating Docker container")
-
- runner.ContainerConfig.Cmd = runner.Container.Command
- if runner.Container.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.Container.Cwd
- }
-
- for k, v := range runner.Container.Environment {
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
- }
-
- runner.ContainerConfig.Volumes = runner.Volumes
-
- maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
- minDockerRAM := int64(16)
- if maxRAM < minDockerRAM*1024*1024 {
- // Docker daemon won't let you set a limit less than ~10 MiB
- maxRAM = minDockerRAM * 1024 * 1024
- }
- runner.HostConfig = dockercontainer.HostConfig{
- Binds: runner.Binds,
- LogConfig: dockercontainer.LogConfig{
- Type: "none",
- },
- Resources: dockercontainer.Resources{
- CgroupParent: runner.setCgroupParent,
- NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
- Memory: maxRAM, // RAM
- MemorySwap: maxRAM, // RAM+swap
- KernelMemory: maxRAM, // kernel portion
- },
+func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+ if mnt, ok := runner.Container.Mounts["stdin"]; ok {
+ switch mnt.Kind {
+ case "collection":
+ var collID string
+ if mnt.UUID != "" {
+ collID = mnt.UUID
+ } else {
+ collID = mnt.PortableDataHash
+ }
+ path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ stdin = f
+ case "json":
+ j, err := json.Marshal(mnt.Content)
+ if err != nil {
+ return fmt.Errorf("error encoding stdin json data: %v", err)
+ }
+ stdin = ioutil.NopCloser(bytes.NewReader(j))
+ default:
+ return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind)
+ }
}
- if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
- tok, err := runner.ContainerToken()
+ var stdout, stderr io.WriteCloser
+ if mnt, ok := runner.Container.Mounts["stdout"]; ok {
+ f, err := runner.getStdoutFile(mnt.Path)
if err != nil {
return err
}
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
- "ARVADOS_API_TOKEN="+tok,
- "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
- "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
- )
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+ stdout = f
+ } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+ return err
} else {
- if runner.enableNetwork == "always" {
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
- } else {
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
- }
+ stdout = NewThrottledLogger(w)
}
- _, stdinUsed := runner.Container.Mounts["stdin"]
- runner.ContainerConfig.OpenStdin = stdinUsed
- runner.ContainerConfig.StdinOnce = stdinUsed
- runner.ContainerConfig.AttachStdin = stdinUsed
- runner.ContainerConfig.AttachStdout = true
- runner.ContainerConfig.AttachStderr = true
-
- createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
- if err != nil {
- return fmt.Errorf("While creating container: %v", err)
+ if mnt, ok := runner.Container.Mounts["stderr"]; ok {
+ f, err := runner.getStdoutFile(mnt.Path)
+ if err != nil {
+ return err
+ }
+ stderr = f
+ } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+ return err
+ } else {
+ stderr = NewThrottledLogger(w)
}
- runner.ContainerID = createdBody.ID
-
- return runner.AttachStreams()
+ env := runner.Container.Environment
+ enableNetwork := runner.enableNetwork == "always"
+ if runner.Container.RuntimeConstraints.API {
+ enableNetwork = true
+ tok, err := runner.ContainerToken()
+ if err != nil {
+ return err
+ }
+ env = map[string]string{}
+ for k, v := range runner.Container.Environment {
+ env[k] = v
+ }
+ env["ARVADOS_API_TOKEN"] = tok
+ env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
+ env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+ }
+ workdir := runner.Container.Cwd
+ if workdir == "." {
+ // both "" and "." mean default
+ workdir = ""
+ }
+ ram := runner.Container.RuntimeConstraints.RAM
+ if !runner.enableMemoryLimit {
+ ram = 0
+ }
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
+
+ if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+ nvidiaModprobe(runner.CrunchLog)
+ }
+
+ return runner.executor.Create(containerSpec{
+ Image: imageID,
+ VCPUs: runner.Container.RuntimeConstraints.VCPUs,
+ RAM: ram,
+ WorkingDir: workdir,
+ Env: env,
+ BindMounts: bindmounts,
+ Command: runner.Container.Command,
+ EnableNetwork: enableNetwork,
+ CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+ NetworkMode: runner.networkMode,
+ CgroupParent: runner.setCgroupParent,
+ Stdin: stdin,
+ Stdout: stdout,
+ Stderr: stderr,
+ })
}
// StartContainer starts the docker container created by CreateContainer.
func (runner *ContainerRunner) StartContainer() error {
- runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+ runner.CrunchLog.Printf("Starting container")
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
if runner.cCancelled {
return ErrCancelled
}
- err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
- dockertypes.ContainerStartOptions{})
+ err := runner.executor.Start()
if err != nil {
var advice string
if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
- var runTimeExceeded <-chan time.Time
runner.CrunchLog.Print("Waiting for container to finish")
-
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
- arvMountExit := runner.ArvMountExit
- if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
- runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
+ var timeout <-chan time.Time
+ if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 {
+ timeout = time.After(time.Duration(s) * time.Second)
}
-
- containerGone := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
go func() {
- defer close(containerGone)
- if runner.containerWatchdogInterval < 1 {
- runner.containerWatchdogInterval = time.Minute
- }
- for range time.NewTicker(runner.containerWatchdogInterval).C {
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
- ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
- cancel()
- runner.cStateLock.Lock()
- done := runner.cRemoved || runner.ExitCode != nil
- runner.cStateLock.Unlock()
- if done {
- return
- } else if err != nil {
- runner.CrunchLog.Printf("Error inspecting container: %s", err)
- runner.checkBrokenNode(err)
- return
- } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
- runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
- return
- }
+ select {
+ case <-timeout:
+ runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
+ runner.stop(nil)
+ case <-runner.ArvMountExit:
+ runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
+ runner.stop(nil)
+ case <-ctx.Done():
}
}()
+ exitcode, err := runner.executor.Wait(ctx)
+ if err != nil {
+ runner.checkBrokenNode(err)
+ return err
+ }
+ runner.ExitCode = &exitcode
- for {
- select {
- case waitBody := <-waitOk:
- runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
- code := int(waitBody.StatusCode)
- runner.ExitCode = &code
-
- // wait for stdout/stderr to complete
- <-runner.loggingDone
- return nil
-
- case err := <-waitErr:
- return fmt.Errorf("container wait: %v", err)
-
- case <-arvMountExit:
- runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
- runner.stop(nil)
- // arvMountExit will always be ready now that
- // it's closed, but that doesn't interest us.
- arvMountExit = nil
+ extra := ""
+ if exitcode&0x80 != 0 {
+ // Convert raw exit status (0x80 + signal number) to a
+ // string to log after the code, like " (signal 101)"
+ // or " (signal 9, killed)"
+ sig := syscall.WaitStatus(exitcode).Signal()
+ if name := unix.SignalName(sig); name != "" {
+ extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+ } else {
+ extra = fmt.Sprintf(" (signal %d)", sig)
+ }
+ }
+ runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
- case <-runTimeExceeded:
- runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
- runner.stop(nil)
- runTimeExceeded = nil
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
- case <-containerGone:
- return errors.New("docker client never returned status")
+ if runner.statReporter != nil {
+ runner.statReporter.Stop()
+ err = runner.statLogger.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
}
}
+func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
+ var updated arvados.Container
+ err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "container": arvadosclient.Dict{
+ "runtime_status": arvadosclient.Dict{
+ "warning": "arv-mount: " + pattern,
+ "warningDetail": text,
+ },
+ },
+ }, &updated)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
-func (runner *ContainerRunner) CaptureOutput() error {
- if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
+ if runner.Container.RuntimeConstraints.API {
// Output may have been set directly by the container, so
// refresh the container record to check.
err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
- binds: runner.Binds,
+ bindmounts: bindmounts,
mounts: runner.Container.Mounts,
secretMounts: runner.SecretMounts,
logger: runner.CrunchLog,
if umnterr != nil {
runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ runner.ArvMount.Process.Kill()
} else {
// If arv-mount --unmount gets stuck for any reason, we
// don't want to wait for it forever. Do Wait() in a goroutine
}
}
}
+ runner.ArvMount = nil
}
if runner.ArvMountPoint != "" {
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
+ runner.ArvMountPoint = ""
}
if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
+ if runner.keepstoreLogger != nil {
+ // Flush any buffered logs from our local keepstore
+ // process. Discard anything logged after this point
+ // -- it won't end up in the log collection, so
+ // there's no point writing it to the collectionfs.
+ runner.keepstoreLogbuf.SetWriter(io.Discard)
+ runner.keepstoreLogger.Close()
+ runner.keepstoreLogger = nil
+ }
+
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// we must be closing the re-opened log, which won't
// -- it exists only to send logs to other channels.
return nil
}
+
saved, err := runner.saveLogCollection(true)
if err != nil {
return fmt.Errorf("error saving log collection: %s", err)
// Already finalized.
return
}
- mt, err := runner.LogCollection.MarshalManifest(".")
- if err != nil {
- err = fmt.Errorf("error creating log manifest: %v", err)
- return
- }
updates := arvadosclient.Dict{
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt,
+ "name": "logs for " + runner.Container.UUID,
+ }
+ mt, err1 := runner.LogCollection.MarshalManifest(".")
+ if err1 == nil {
+ // Only send updated manifest text if there was no
+ // error.
+ updates["manifest_text"] = mt
}
+
+ // Even if flushing the manifest had an error, we still want
+ // to update the log record, if possible, to push the trash_at
+ // and delete_at times into the future. Details on bug
+ // #17293.
if final {
updates["is_trashed"] = true
} else {
updates["delete_at"] = exp
}
reqBody := arvadosclient.Dict{"collection": updates}
+ var err2 error
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
- err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
+ err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
} else {
- err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
}
- if err != nil {
- return
+ if err2 == nil {
+ runner.logUUID = response.UUID
+ }
+
+ if err1 != nil || err2 != nil {
+ err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
}
- runner.logUUID = response.UUID
return
}
return ErrCancelled
}
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gatewayAddress}}, nil)
+ arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
}
// ContainerToken returns the api_token the container (and any
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
- runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
+ runner.CrunchLog.Printf("%s", currentUserAndGroups())
+ runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
hostname, hosterr := os.Hostname()
if hosterr != nil {
return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
}
+ var bindmounts map[string]bindmount
defer func() {
// checkErr prints e (unless it's nil) and sets err to
// e (unless err is already non-nil). Thus, if err
// capture partial output and write logs
}
- checkErr("CaptureOutput", runner.CaptureOutput())
+ if bindmounts != nil {
+ checkErr("CaptureOutput", runner.CaptureOutput(bindmounts))
+ }
checkErr("stopHoststat", runner.stopHoststat())
checkErr("CommitLogs", runner.CommitLogs())
+ runner.CleanupDirs()
checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
}()
return
}
+ // set up FUSE mount and binds
+ bindmounts, err = runner.SetupMounts()
+ if err != nil {
+ runner.finalState = "Cancelled"
+ err = fmt.Errorf("While setting up mounts: %v", err)
+ return
+ }
+
// check for and/or load image
- err = runner.LoadImage()
+ imageID, err := runner.LoadImage()
if err != nil {
if !runner.checkBrokenNode(err) {
// Failed to load image but not due to a "broken node"
return
}
- // set up FUSE mount and binds
- err = runner.SetupMounts()
- if err != nil {
- runner.finalState = "Cancelled"
- err = fmt.Errorf("While setting up mounts: %v", err)
- return
- }
-
- err = runner.CreateContainer()
+ err = runner.CreateContainer(imageID, bindmounts)
if err != nil {
return
}
return fmt.Errorf("error creating container API client: %v", err)
}
+ runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+ runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
if err != nil {
if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
func NewContainerRunner(dispatcherClient *arvados.Client,
dispatcherArvClient IArvadosClient,
dispatcherKeepClient IKeepClient,
- docker ThinDockerClient,
containerUUID string) (*ContainerRunner, error) {
cr := &ContainerRunner{
dispatcherClient: dispatcherClient,
DispatcherArvClient: dispatcherArvClient,
DispatcherKeepClient: dispatcherKeepClient,
- Docker: docker,
}
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
}
func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
- enableNetwork := flags.String("container-enable-networking", "default",
- `Specify if networking should be enabled for container. One of 'default', 'always':
- default: only enable networking if container requests it.
- always: containers always have networking enabled
- `)
- networkMode := flags.String("container-network-mode", "default",
- `Set networking mode for container. Corresponds to Docker network mode (--net).
- `)
+ enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
+ enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
+ networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+ runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
ignoreDetachFlag := false
ignoreDetachFlag = true
}
- if err := flags.Parse(args); err == flag.ErrHelp {
- return 0
- } else if err != nil {
- log.Print(err)
- return 1
- }
-
- if *stdinEnv && !ignoreDetachFlag {
- // Load env vars on stdin if asked (but not in a
- // detached child process, in which case stdin is
- // /dev/null).
- err := loadEnv(os.Stdin)
- if err != nil {
- log.Print(err)
- return 1
- }
+ if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if !*list && flags.NArg() != 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
}
- containerID := flags.Arg(0)
+ containerUUID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerID, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
case *kill >= 0:
- return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerID == "" {
+ if len(containerUUID) != 27 {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
+ var conf ConfigData
+ if *stdinConfig {
+ err := json.NewDecoder(stdin).Decode(&conf)
+ if err != nil {
+ log.Printf("decode stdin: %s", err)
+ return 1
+ }
+ for k, v := range conf.Env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Printf("setenv(%q): %s", k, err)
+ return 1
+ }
+ }
+ if conf.Cluster != nil {
+ // ClusterID is missing from the JSON
+ // representation, but we need it to generate
+ // a valid config file for keepstore, so we
+ // fill it using the container UUID prefix.
+ conf.Cluster.ClusterID = containerUUID[:5]
+ }
+ }
+
log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
arvadosclient.CertFiles = []string{*caCertsPath}
}
+ var keepstoreLogbuf bufThenWrite
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if keepstore != nil {
+ defer keepstore.Process.Kill()
+ }
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("%s: %v", containerID, err)
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
api.Retries = 8
- kc, kcerr := keepclient.MakeKeepClient(api)
- if kcerr != nil {
- log.Printf("%s: %v", containerID, kcerr)
+ kc, err := keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- // API version 1.21 corresponds to Docker 1.9, which is currently the
- // minimum version we want to support.
- docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-
- cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
+ cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
if err != nil {
log.Print(err)
return 1
}
- if dockererr != nil {
- cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
- cr.checkBrokenNode(dockererr)
+
+ if keepstore == nil {
+ // Log explanation (if any) for why we're not running
+ // a local keepstore.
+ var buf bytes.Buffer
+ keepstoreLogbuf.SetWriter(&buf)
+ if buf.Len() > 0 {
+ cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+ }
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ logwriter, err := cr.NewLogWriter("keepstore")
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogbuf = &keepstoreLogbuf
+ }
+
+ switch *runtimeEngine {
+ case "docker":
+ cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
+ case "singularity":
+ cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
+ default:
+ cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
cr.CrunchLog.Close()
return 1
}
-
- cr.gatewayAuthSecret = os.Getenv("GatewayAuthSecret")
- os.Unsetenv("GatewayAuthSecret")
- err = cr.startGatewayServer()
if err != nil {
- log.Printf("error starting gateway server: %s", err)
+ cr.CrunchLog.Printf("%s: %v", containerUUID, err)
+ cr.checkBrokenNode(err)
+ cr.CrunchLog.Close()
return 1
}
+ defer cr.executor.Close()
+
+ gwAuthSecret := os.Getenv("GatewayAuthSecret")
+ os.Unsetenv("GatewayAuthSecret")
+ if gwAuthSecret == "" {
+ // not safe to run a gateway service without an auth
+ // secret
+ cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
+ } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
+ // dispatcher did not tell us which external IP
+ // address to advertise --> no gateway service
+ cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
+ } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ cr.gateway = Gateway{
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ DockerContainerID: &de.containerID,
+ Log: cr.CrunchLog,
+ ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ }
+ err = cr.gateway.Start()
+ if err != nil {
+ log.Printf("error starting gateway server: %s", err)
+ return 1
+ }
+ }
- parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
+ parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".")
if tmperr != nil {
- log.Printf("%s: %v", containerID, tmperr)
+ log.Printf("%s: %v", containerUUID, tmperr)
return 1
}
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
+ cr.enableMemoryLimit = *enableMemoryLimit
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
}
if runerr != nil {
- log.Printf("%s: %v", containerID, runerr)
+ log.Printf("%s: %v", containerUUID, runerr)
return 1
}
return 0
}
-func loadEnv(rdr io.Reader) error {
- buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ return nil, nil
+ }
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
- return fmt.Errorf("read stdin: %s", err)
+ return nil, err
}
- var env map[string]string
- err = json.Unmarshal(buf, &env)
+ _, port, err := net.SplitHostPort(ln.Addr().String())
if err != nil {
- return fmt.Errorf("decode stdin: %s", err)
+ ln.Close()
+ return nil, err
}
- for k, v := range env {
- err = os.Setenv(k, v)
+ ln.Close()
+ url := "http://localhost:" + port
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = append(cmd.Env,
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
+ err = cmd.Start()
+ if err != nil {
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
+ }
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
if err != nil {
- return fmt.Errorf("setenv(%q): %s", k, err)
+ return nil, err
+ }
+ resp, err := client.Do(testReq)
+ if err == nil {
+ resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+ if cmdExited {
+ return nil, fmt.Errorf("keepstore child process exited")
+ }
+ if ctx.Err() != nil {
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
}
}
- return nil
+ os.Setenv("ARVADOS_KEEP_SERVICES", url)
+ return cmd, nil
+}
+
+// return current uid, gid, groups in a format suitable for logging:
+// "crunch-run process has uid=1234(arvados) gid=1234(arvados)
+// groups=1234(arvados),114(fuse)"
+func currentUserAndGroups() string {
+ u, err := user.Current()
+ if err != nil {
+ return fmt.Sprintf("error getting current user ID: %s", err)
+ }
+ s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
+ if g, err := user.LookupGroupId(u.Gid); err == nil {
+ s += fmt.Sprintf("(%s)", g.Name)
+ }
+ s += " groups="
+ if gids, err := u.GroupIds(); err == nil {
+ for i, gid := range gids {
+ if i > 0 {
+ s += ","
+ }
+ s += gid
+ if g, err := user.LookupGroupId(gid); err == nil {
+ s += fmt.Sprintf("(%s)", g.Name)
+ }
+ }
+ }
+ return s
}