"os"
"os/exec"
"os/signal"
+ "path"
"strings"
"sync"
"syscall"
PortableDataHash string `json:"portable_data_hash"`
UUID string `json:"uuid"`
DeviceType string `json:"device_type"`
+ Path string `json:"path"`
}
// Collection record returned by the API server.
LoadImage(reader io.Reader) error
CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
StartContainer(id string, config *dockerclient.HostConfig) error
- ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
+ AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
Wait(id string) <-chan dockerclient.WaitResult
RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
}
NewLogWriter
loggingDone chan bool
CrunchLog *ThrottledLogger
- Stdout *ThrottledLogger
+ Stdout io.WriteCloser
Stderr *ThrottledLogger
LogCollection *CollectionWriter
LogsPDH *string
// SetupSignals sets up signal handling to gracefully terminate the underlying
// Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() error {
+func (runner *ContainerRunner) SetupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
signal.Notify(runner.SigChan, syscall.SIGINT)
}
}
}(runner.SigChan)
-
- return nil
}
// LoadImage determines the docker image id from the container record and
pdhOnly := true
tmpcount := 0
- arvMountCmd := []string{"--foreground", "--allow-other"}
+ arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
collectionPaths := []string{}
runner.Binds = nil
for bind, mnt := range runner.ContainerRecord.Mounts {
+ if bind == "stdout" {
+ // Is it a "file" mount kind?
+ if mnt.Kind != "file" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ }
+
+ // Does path start with OutputPath?
+ prefix := runner.ContainerRecord.OutputPath
+ if !strings.HasSuffix(prefix, "/") {
+ prefix += "/"
+ }
+ if !strings.HasPrefix(mnt.Path, prefix) {
+ return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ }
+ }
+
if mnt.Kind == "collection" {
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
if staterr != nil {
return fmt.Errorf("While Stat on temp dir: %v", staterr)
}
- err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid)
+ err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
} else {
runner.Binds = append(runner.Binds, bind)
}
- } else {
- return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
}
}
return nil
}
-// StartContainer creates the container and runs it.
-func (runner *ContainerRunner) StartContainer() (err error) {
- runner.CrunchLog.Print("Creating Docker container")
-
- runner.CancelLock.Lock()
- defer runner.CancelLock.Unlock()
-
- if runner.Cancelled {
- return ErrCancelled
- }
-
- runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
- if runner.ContainerRecord.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
- }
- for k, v := range runner.ContainerRecord.Environment {
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
- }
- runner.ContainerConfig.NetworkDisabled = true
- runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
- if err != nil {
- return fmt.Errorf("While creating container: %v", err)
- }
- hostConfig := &dockerclient.HostConfig{Binds: runner.Binds}
-
- runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
- err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
- if err != nil {
- return fmt.Errorf("While starting container: %v", err)
- }
-
- return nil
-}
-
func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
// Handle docker log protocol
// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
_, readerr := io.ReadAtLeast(containerReader, header, 8)
if readerr == nil {
- readsize := int64(header[4]) | (int64(header[5]) << 8) | (int64(header[6]) << 16) | (int64(header[7]) << 24)
+ readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
if header[0] == 1 {
// stdout
_, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
closeerr := runner.Stdout.Close()
if closeerr != nil {
- runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
+ runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
}
closeerr = runner.Stderr.Close()
if closeerr != nil {
- runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
+ runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
}
runner.loggingDone <- true
// AttachLogs connects the docker container stdout and stderr logs to the
// Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachLogs() (err error) {
+func (runner *ContainerRunner) AttachStreams() (err error) {
- runner.CrunchLog.Print("Attaching container logs")
+ runner.CrunchLog.Print("Attaching container streams")
var containerReader io.Reader
- containerReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true, Stderr: true})
+ containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
+ &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
if err != nil {
- return fmt.Errorf("While attaching container logs: %v", err)
+ return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
runner.loggingDone = make(chan bool)
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
+ stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return fmt.Errorf("While creating stdout file: %v", err)
+ }
+ runner.Stdout = stdoutFile
+ } else {
+ runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ }
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
go runner.ProcessDockerAttach(containerReader)
return nil
}
+// StartContainer creates the container and runs it.
+func (runner *ContainerRunner) StartContainer() (err error) {
+ runner.CrunchLog.Print("Creating Docker container")
+
+ runner.CancelLock.Lock()
+ defer runner.CancelLock.Unlock()
+
+ if runner.Cancelled {
+ return ErrCancelled
+ }
+
+ runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
+ if runner.ContainerRecord.Cwd != "." {
+ runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
+ }
+ for k, v := range runner.ContainerRecord.Environment {
+ runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+ }
+ runner.ContainerConfig.NetworkDisabled = true
+ runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+ if err != nil {
+ return fmt.Errorf("While creating container: %v", err)
+ }
+ hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
+ LogConfig: dockerclient.LogConfig{Type: "none"}}
+
+ err = runner.AttachStreams()
+ if err != nil {
+ return err
+ }
+
+ runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+ err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
+ if err != nil {
+ return fmt.Errorf("While starting container: %v", err)
+ }
+
+ return nil
+}
+
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
+ runner.CrunchLog.Print("Waiting for container to finish")
+
result := runner.Docker.Wait(runner.ContainerID)
wr := <-result
if wr.Error != nil {
for _, tmpdir := range runner.CleanupTempDir {
rmerr := os.RemoveAll(tmpdir)
- runner.CrunchLog.Printf("While cleaning up temporary directories: %v", rmerr)
+ if rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
+ }
}
}
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+ hostname, hosterr := os.Hostname()
+ if hosterr != nil {
+ runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
+ } else {
+ runner.CrunchLog.Printf("Executing on host '%s'", hostname)
+ }
+
var runerr, waiterr error
defer func() {
runner.finalState = "Complete"
}
- // (7) capture output
+ // (6) capture output
outputerr := runner.CaptureOutput()
if outputerr != nil {
runner.CrunchLog.Print(outputerr)
}
- // (8) clean up temporary directories
+ // (7) clean up temporary directories
runner.CleanupDirs()
- // (9) write logs
+ // (8) write logs
logerr := runner.CommitLogs()
if logerr != nil {
runner.CrunchLog.Print(logerr)
}
- // (10) update container record with results
+ // (9) update container record with results
updateerr := runner.UpdateContainerRecordComplete()
if updateerr != nil {
runner.CrunchLog.Print(updateerr)
if runerr != nil {
err = runerr
} else if waiterr != nil {
- err = runerr
+ err = waiterr
} else if logerr != nil {
err = logerr
} else if updateerr != nil {
}
// (1) setup signal handling
- err = runner.SetupSignals()
- if err != nil {
- return fmt.Errorf("While setting up signal handling: %v", err)
- }
+ runner.SetupSignals()
// (2) check for and/or load image
err = runner.LoadImage()
runner.CrunchLog.Print(err)
}
- // (5) attach container logs
- runerr = runner.AttachLogs()
- if runerr != nil {
- runner.CrunchLog.Print(runerr)
- }
-
- // (6) wait for container to finish
+ // (5) wait for container to finish
waiterr = runner.WaitFinish()
return
cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
cr.ContainerRecord.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+ cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
return cr
}
func main() {
flag.Parse()
+ containerId := flag.Arg(0)
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
api.Retries = 8
var kc *keepclient.KeepClient
kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
kc.Retries = 4
var docker *dockerclient.DockerClient
docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
+ cr := NewContainerRunner(api, kc, docker, containerId)
err = cr.Run()
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
}