loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
- Stderr *ThrottledLogger
+ Stderr io.WriteCloser
LogCollection *CollectionWriter
LogsPDH *string
RunArvMount
HostOutputDir string
CleanupTempDir []string
Binds []string
+ Volumes map[string]struct{}
OutputPDH *string
SigChan chan os.Signal
ArvMountExit chan error
collectionPaths := []string{}
runner.Binds = nil
+ runner.Volumes = make(map[string]struct{})
needCertMount := true
var binds []string
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if bind == "stdout" {
+ if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
}
}
}
switch {
- case mnt.Kind == "collection":
+ case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
case mnt.Kind == "tmp":
- runner.Binds = append(runner.Binds, bind)
+ runner.Volumes[bind] = struct{}{}
case mnt.Kind == "json":
jsondata, err := json.Marshal(mnt.Content)
// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
- runner.ArvClient,
- runner.Container.UUID,
- "container",
- runner.LogCollection.Open("container.json"),
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "container",
+ writeCloser: runner.LogCollection.Open("container.json"),
}
+
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
return nil
}
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
} else if err != nil {
return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
}
-
- defer stdinRdr.Close()
} else if stdinMnt.Kind == "json" {
stdinJson, err = json.Marshal(stdinMnt.Content)
if err != nil {
runner.loggingDone = make(chan bool)
if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
- index := strings.LastIndex(stdoutPath, "/")
- if index > 0 {
- subdirs := stdoutPath[:index]
- if subdirs != "" {
- st, err := os.Stat(runner.HostOutputDir)
- if err != nil {
- return fmt.Errorf("While Stat on temp dir: %v", err)
- }
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
- err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
- if err != nil {
- return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
- }
- }
- }
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
if err != nil {
- return fmt.Errorf("While creating stdout file: %v", err)
+ return err
}
runner.Stdout = stdoutFile
} else {
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
}
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+ if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+ stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+ if err != nil {
+ return err
+ }
+ runner.Stderr = stderrFile
+ } else {
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ }
if stdinRdr != nil {
go func() {
runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
runner.stop()
}
- response.Conn.Close()
+ stdinRdr.Close()
+ response.CloseWrite()
}()
} else if len(stdinJson) != 0 {
go func() {
runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
runner.stop()
}
- response.Conn.Close()
+ response.CloseWrite()
}()
}
return nil
}
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+ stdoutPath := mntPath[len(runner.Container.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+ }
+
+ return stdoutFile, nil
+}
+
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+ runner.ContainerConfig.Volumes = runner.Volumes
+
runner.HostConfig = dockercontainer.HostConfig{
Binds: runner.Binds,
Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
}
}
+ _, stdinUsed := runner.Container.Mounts["stdin"]
+ runner.ContainerConfig.OpenStdin = stdinUsed
+ runner.ContainerConfig.StdinOnce = stdinUsed
+ runner.ContainerConfig.AttachStdin = stdinUsed
+ runner.ContainerConfig.AttachStdout = true
+ runner.ContainerConfig.AttachStderr = true
+
createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
- "crunch-run", nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+ loadLogThrottleParams(api)
+
return cr
}