+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package main
import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- //"git.curoverse.com/arvados.git/sdk/go/keepclient"
- "errors"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
"log"
"os"
"os/exec"
"os/signal"
+ "strings"
"syscall"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
-func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
- r = make(arvadosclient.Dict)
- err := api.Get(rsc, uuid, nil, &r)
- if err != nil {
- log.Fatal(err)
- }
- return r
+type TaskDef struct {
+ Command []string `json:"command"`
+ Env map[string]string `json:"task.env"`
+ Stdin string `json:"task.stdin"`
+ Stdout string `json:"task.stdout"`
+ Stderr string `json:"task.stderr"`
+ Vwd map[string]string `json:"task.vwd"`
+ SuccessCodes []int `json:"task.successCodes"`
+ PermanentFailCodes []int `json:"task.permanentFailCodes"`
+ TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ KeepTmpOutput bool `json:"task.keepTmpOutput"`
+}
+
+type Tasks struct {
+ Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+ ScriptParameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+ JobUUID string `json:"job_uuid"`
+ CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+ Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+ Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(tmpdir string) (outdir string, err error) {
- err = os.Chdir(tmpdir)
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
+ tmpdir = crunchtmpdir + "/tmpdir"
+ err = os.Mkdir(tmpdir, 0700)
if err != nil {
- return "", err
+ return "", "", err
}
- err = os.Mkdir("tmpdir", 0700)
- if err != nil {
- return "", err
+ if keepTmp {
+ outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+ } else {
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
}
- err = os.Mkdir("outdir", 0700)
- if err != nil {
- return "", err
+ return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+ if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+ return fmt.Errorf("Path must not start or end with '/'")
+ }
+ if strings.Index("../", fn) != -1 {
+ return fmt.Errorf("Path must not contain '../'")
}
- os.Chdir("outdir")
+ sl := strings.LastIndex(fn, "/")
+ if sl != -1 {
+ os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+ }
+ return nil
+}
+
+func copyFile(dst, src string) error {
+ in, err := os.Open(src)
if err != nil {
- return "", err
+ return err
}
+ defer in.Close()
- outdir, err = os.Getwd()
+ out, err := os.Create(dst)
if err != nil {
- return "", err
+ return err
}
+ defer out.Close()
- return outdir, nil
+ _, err = io.Copy(out, in)
+ return err
}
-func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
- var err error
-
- if taskp["task.vwd"] != nil {
- // Set up VWD symlinks in outdir
- // TODO
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
+ if taskp.Vwd != nil {
+ for k, v := range taskp.Vwd {
+ v = substitute(v, replacements)
+ err = checkOutputFilename(outdir, k)
+ if err != nil {
+ return "", "", "", err
+ }
+ if taskp.KeepTmpOutput {
+ err = copyFile(v, outdir+"/"+k)
+ } else {
+ err = os.Symlink(v, outdir+"/"+k)
+ }
+ if err != nil {
+ return "", "", "", err
+ }
+ }
}
- if taskp["task.stdin"] != nil {
- stdin, ok := taskp["task.stdin"].(string)
- if !ok {
- return errors.New("Could not cast task.stdin to string")
- }
+ if taskp.Stdin != "" {
// Set up stdin redirection
- cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+ stdin = substitute(taskp.Stdin, replacements)
+ cmd.Stdin, err = os.Open(stdin)
if err != nil {
- log.Fatal(err)
+ return "", "", "", err
}
}
- if taskp["task.stdout"] != nil {
- stdout, ok := taskp["task.stdout"].(string)
- if !ok {
- return errors.New("Could not cast task.stdout to string")
+ if taskp.Stdout != "" {
+ err = checkOutputFilename(outdir, taskp.Stdout)
+ if err != nil {
+ return "", "", "", err
}
-
// Set up stdout redirection
- cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+ stdout = outdir + "/" + taskp.Stdout
+ cmd.Stdout, err = os.Create(stdout)
if err != nil {
- log.Fatal(err)
+ return "", "", "", err
}
} else {
cmd.Stdout = os.Stdout
}
- if taskp["task.env"] != nil {
- taskenv, ok := taskp["task.env"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast task.env to map")
+ if taskp.Stderr != "" {
+ err = checkOutputFilename(outdir, taskp.Stderr)
+ if err != nil {
+ return "", "", "", err
}
+ // Set up stderr redirection
+ stderr = outdir + "/" + taskp.Stderr
+ cmd.Stderr, err = os.Create(stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ } else {
+ cmd.Stderr = os.Stderr
+ }
+ if taskp.Env != nil {
// Set up subprocess environment
cmd.Env = os.Environ()
- for k, v := range taskenv {
- var vstr string
- vstr, ok = v.(string)
- if !ok {
- return errors.New("Could not cast environment value to string")
- }
- cmd.Env = append(cmd.Env, k+"="+vstr)
+ for k, v := range taskp.Env {
+ v = substitute(v, replacements)
+ cmd.Env = append(cmd.Env, k+"="+v)
}
}
- return nil
+ return stdin, stdout, stderr, nil
}
-func setupSignals(cmd *exec.Cmd) {
- // Set up signal handlers
- // Forward SIGINT, SIGTERM and SIGQUIT to inner process
+// Set up signal handlers. Go sends signal notifications to a "signal
+// channel".
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
sigChan := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- catch := <-sig
- if cmd.Process != nil {
- cmd.Process.Signal(catch)
- }
- }(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGQUIT)
+ return sigChan
}
-func inCodes(code int, codes interface{}) bool {
+func inCodes(code int, codes []int) bool {
if codes != nil {
- codesArray, ok := codes.([]interface{})
- if !ok {
- return false
- }
- for _, c := range codesArray {
- var num float64
- num, ok = c.(float64)
- if ok && code == int(num) {
+ for _, c := range codes {
+ if code == c {
return true
}
}
const TASK_TEMPFAIL = 111
-type TempFail struct{ InnerError error }
+type TempFail struct{ error }
type PermFail struct{}
-func (s TempFail) Error() string {
- return s.InnerError.Error()
-}
-
func (s PermFail) Error() string {
return "PermFail"
}
-func runner(api arvadosclient.ArvadosClient,
- jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
- taskStruct arvadosclient.Dict) error {
+func substitute(inp string, subst map[string]string) string {
+ for k, v := range subst {
+ inp = strings.Replace(inp, k, v, -1)
+ }
+ return inp
+}
- var err error
- var ok bool
- var jobp, taskp map[string]interface{}
- jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast job script_parameters to map")
+func getKeepTmp(outdir string) (manifest string, err error) {
+ fn, err := os.Open(outdir + "/" + ".arvados#collection")
+ if err != nil {
+ return "", err
}
+ defer fn.Close()
- taskp, ok = taskStruct["parameters"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast task parameters to map")
+ buf, err := ioutil.ReadAll(fn)
+ if err != nil {
+ return "", err
}
+ collection := arvados.Collection{}
+ err = json.Unmarshal(buf, &collection)
+ return collection.ManifestText, err
+}
+
+func runner(api IArvadosClient,
+ kc IKeepClient,
+ jobUUID, taskUUID, crunchtmpdir, keepmount string,
+ jobStruct Job, taskStruct Task) error {
+
+ var err error
+ taskp := taskStruct.Parameters
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
- if taskStruct["sequence"] == 0.0 {
- var tasks []interface{}
- tasks, ok = jobp["tasks"].([]interface{})
- if !ok {
- return errors.New("Could not cast tasks to array")
- }
-
- if len(tasks) == 1 {
- taskp = tasks[0].(map[string]interface{})
+ if taskStruct.Sequence == 0 {
+ if len(jobStruct.ScriptParameters.Tasks) == 1 {
+ taskp = jobStruct.ScriptParameters.Tasks[0]
} else {
- for task := range tasks {
- err := api.Call("POST", "job_tasks", "", "",
- arvadosclient.Dict{
- "job_uuid": jobUuid,
- "created_by_job_task_uuid": "",
- "sequence": 1,
- "parameters": task},
+ for _, task := range jobStruct.ScriptParameters.Tasks {
+ err := api.Create("job_tasks",
+ map[string]interface{}{
+ "job_task": Task{
+ JobUUID: jobUUID,
+ CreatedByJobTaskUUID: taskUUID,
+ Sequence: 1,
+ Parameters: task}},
nil)
if err != nil {
return TempFail{err}
}
}
- err = api.Call("PUT", "job_tasks", taskUuid, "",
- arvadosclient.Dict{
- "job_task": arvadosclient.Dict{
+ err = api.Update("job_tasks", taskUUID,
+ map[string]interface{}{
+ "job_task": map[string]interface{}{
"output": "",
"success": true,
"progress": 1.0}},
}
}
- // Set up subprocess
- var commandline []string
- var commandsarray []interface{}
-
- commandsarray, ok = taskp["command"].([]interface{})
- if !ok {
- return errors.New("Could not cast commands to array")
+ var tmpdir, outdir string
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
+ if err != nil {
+ return TempFail{err}
}
- for _, c := range commandsarray {
- var cstr string
- cstr, ok = c.(string)
- if !ok {
- return errors.New("Could not cast command argument to string")
- }
- commandline = append(commandline, cstr)
- }
- cmd := exec.Command(commandline[0], commandline[1:]...)
+ replacements := map[string]string{
+ "$(task.tmpdir)": tmpdir,
+ "$(task.outdir)": outdir,
+ "$(task.keep)": keepmount}
- var outdir string
- outdir, err = setupDirectories(tmpdir)
- if err != nil {
- return TempFail{err}
+ log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+ log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+ log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
+ // Set up subprocess
+ for k, v := range taskp.Command {
+ taskp.Command[k] = substitute(v, replacements)
}
+ cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
cmd.Dir = outdir
- err = setupCommand(cmd, taskp, keepmount, outdir)
+ var stdin, stdout, stderr string
+ stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
if err != nil {
return err
}
- setupSignals(cmd)
-
// Run subprocess and wait for it to complete
- log.Printf("Running %v", cmd.Args)
+ if stdin != "" {
+ stdin = " < " + stdin
+ }
+ if stdout != "" {
+ stdout = " > " + stdout
+ }
+ if stderr != "" {
+ stderr = " 2> " + stderr
+ }
+ log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
- err = cmd.Run()
+ var caughtSignal os.Signal
+ sigChan := setupSignals(cmd)
+ err = cmd.Start()
if err != nil {
+ signal.Stop(sigChan)
return TempFail{err}
}
- const success = 1
- const permfail = 2
- const tempfail = 2
- var status int
+ finishedSignalNotify := make(chan struct{})
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ caughtSignal = sig
+ cmd.Process.Signal(caughtSignal)
+ }
+ close(finishedSignalNotify)
+ }(sigChan)
+
+ err = cmd.Wait()
+ signal.Stop(sigChan)
+
+ close(sigChan)
+ <-finishedSignalNotify
+
+ if caughtSignal != nil {
+ log.Printf("Caught signal %v", caughtSignal)
+ return PermFail{}
+ }
+
+ if err != nil {
+ // Run() returns ExitError on non-zero exit code, but we handle
+ // that down below. So only return if it's not ExitError.
+ if _, ok := err.(*exec.ExitError); !ok {
+ return TempFail{err}
+ }
+ }
+
+ var success bool
exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
- if inCodes(exitCode, taskp["task.successCodes"]) {
- status = success
- } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
- status = permfail
- } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
- os.Exit(TASK_TEMPFAIL)
- } else if cmd.ProcessState.Success() {
- status = success
+ log.Printf("Completed with exit code %v", exitCode)
+
+ if inCodes(exitCode, taskp.PermanentFailCodes) {
+ success = false
+ } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+ return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+ } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+ success = true
} else {
- status = permfail
+ success = false
}
// Upload output directory
- // TODO
+ var manifest string
+ if taskp.KeepTmpOutput {
+ manifest, err = getKeepTmp(outdir)
+ } else {
+ manifest, err = WriteTree(kc, outdir)
+ }
+ if err != nil {
+ return TempFail{err}
+ }
// Set status
- err = api.Call("PUT", "job_tasks", taskUuid, "",
- arvadosclient.Dict{
- "job_task": arvadosclient.Dict{
- "output": "",
- "success": status == success,
- "progress": 1.0}},
+ err = api.Update("job_tasks", taskUUID,
+ map[string]interface{}{
+ "job_task": Task{
+ Output: manifest,
+ Success: success,
+ Progress: 1}},
nil)
if err != nil {
return TempFail{err}
}
- if status == success {
+ if success {
return nil
} else {
return PermFail{}
}
func main() {
- syscall.Umask(0077)
-
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatal(err)
}
- jobUuid := os.Getenv("JOB_UUID")
- taskUuid := os.Getenv("TASK_UUID")
+ jobUUID := os.Getenv("JOB_UUID")
+ taskUUID := os.Getenv("TASK_UUID")
tmpdir := os.Getenv("TASK_WORK")
keepmount := os.Getenv("TASK_KEEPMOUNT")
- jobStruct := getRecord(api, "jobs", jobUuid)
- taskStruct := getRecord(api, "job_tasks", taskUuid)
+ var jobStruct Job
+ var taskStruct Task
+
+ err = api.Get("jobs", jobUUID, nil, &jobStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+ err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ var kc IKeepClient
+ kc, err = keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Fatal(err)
+ }
- err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+ syscall.Umask(0022)
+ err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
os.Exit(0)