"os"
"os/exec"
"os/signal"
+ "path"
"strings"
"sync"
"syscall"
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+ Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
}
// ErrCancelled is the error returned when the container is cancelled.
PortableDataHash string `json:"portable_data_hash"`
UUID string `json:"uuid"`
DeviceType string `json:"device_type"`
+ Path string `json:"path"`
}
// Collection record returned by the API server.
Output string `json:"output"`
}
+// APIClientAuthorization is an arvados#api_client_authorization resource.
+type APIClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
-type RunArvMount func([]string) (*exec.Cmd, error)
+type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
NewLogWriter
loggingDone chan bool
CrunchLog *ThrottledLogger
- Stdout *ThrottledLogger
+ Stdout io.WriteCloser
Stderr *ThrottledLogger
LogCollection *CollectionWriter
LogsPDH *string
return nil
}
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
+func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
c = exec.Command("arv-mount", arvMountCmd...)
+
+ // Copy our environment, but override ARVADOS_API_TOKEN with
+ // the container auth token.
+ c.Env = nil
+ for _, s := range os.Environ() {
+ if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
+ c.Env = append(c.Env, s)
+ }
+ }
+ c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
+
nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
c.Stdout = nt
c.Stderr = nt
runner.Binds = nil
for bind, mnt := range runner.ContainerRecord.Mounts {
+ if bind == "stdout" {
+ // Is it a "file" mount kind?
+ if mnt.Kind != "file" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ }
+
+ // Does path start with OutputPath?
+ prefix := runner.ContainerRecord.OutputPath
+ if !strings.HasSuffix(prefix, "/") {
+ prefix += "/"
+ }
+ if !strings.HasPrefix(mnt.Path, prefix) {
+ return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ }
+ }
+
if mnt.Kind == "collection" {
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
} else {
runner.Binds = append(runner.Binds, bind)
}
- } else {
- return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
}
}
}
arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
- runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
+ token, err := runner.ContainerToken()
+ if err != nil {
+ return fmt.Errorf("could not get container token: %s", err)
+ }
+
+ runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
if err != nil {
return fmt.Errorf("While trying to start arv-mount: %v", err)
}
runner.loggingDone = make(chan bool)
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
+ stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return fmt.Errorf("While creating stdout file: %v", err)
+ }
+ runner.Stdout = stdoutFile
+ } else {
+ runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ }
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
go runner.ProcessDockerAttach(containerReader)
arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
+// ContainerToken returns the api_token the container (and any
+// arv-mount processes) are allowed to use.
+func (runner *ContainerRunner) ContainerToken() (string, error) {
+ var auth APIClientAuthorization
+ err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
+ return auth.APIToken, err
+}
+
// UpdateContainerRecordComplete updates the container record state on API
// server to "Complete" or "Cancelled"
func (runner *ContainerRunner) UpdateContainerRecordComplete() error {