package main
import (
+ "crypto/x509"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
"log"
+ "net/http"
"os"
"os/exec"
"os/signal"
+ "path"
"strings"
"syscall"
)
type TaskDef struct {
- command []string `json:"command"`
- env map[string]string `json:"task.env"`
- stdin string `json:"task.stdin"`
- stdout string `json:"task.stdout"`
- vwd map[string]string `json:"task.vwd"`
- successCodes []int `json:"task.successCodes"`
- permanentFailCodes []int `json:"task.permanentFailCodes"`
- temporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ Command []string `json:"command"`
+ Env map[string]string `json:"task.env"`
+ Stdin string `json:"task.stdin"`
+ Stdout string `json:"task.stdout"`
+ Vwd map[string]string `json:"task.vwd"`
+ SuccessCodes []int `json:"task.successCodes"`
+ PermanentFailCodes []int `json:"task.permanentFailCodes"`
+ TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
}
type Tasks struct {
- tasks []TaskDef `json:"script_parameters"`
+ Tasks []TaskDef `json:"tasks"`
}
type Job struct {
- script_parameters Tasks `json:"script_parameters"`
+ Script_parameters Tasks `json:"script_parameters"`
}
type Task struct {
- job_uuid string `json:"job_uuid"`
- created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
- parameters TaskDef `json:"parameters"`
- sequence int `json:"sequence"`
- output string `json:"output"`
- success bool `json:"success"`
- progress float32 `json:"sequence"`
+ Job_uuid string `json:"job_uuid"`
+ Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
}
type IArvadosClient interface {
}
func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
- if taskp.vwd != nil {
- for k, v := range taskp.vwd {
+ if taskp.Vwd != nil {
+ for k, v := range taskp.Vwd {
v = substitute(v, replacements)
err = checkOutputFilename(outdir, k)
if err != nil {
}
}
- if taskp.stdin != "" {
+ if taskp.Stdin != "" {
// Set up stdin redirection
- stdin = substitute(taskp.stdin, replacements)
+ stdin = substitute(taskp.Stdin, replacements)
cmd.Stdin, err = os.Open(stdin)
if err != nil {
return "", "", err
}
}
- if taskp.stdout != "" {
- err = checkOutputFilename(outdir, taskp.stdout)
+ if taskp.Stdout != "" {
+ err = checkOutputFilename(outdir, taskp.Stdout)
if err != nil {
return "", "", err
}
// Set up stdout redirection
- stdout = outdir + "/" + taskp.stdout
+ stdout = outdir + "/" + taskp.Stdout
cmd.Stdout, err = os.Create(stdout)
if err != nil {
return "", "", err
cmd.Stdout = os.Stdout
}
- if taskp.env != nil {
+ cmd.Stderr = os.Stderr
+
+ if taskp.Env != nil {
// Set up subprocess environment
cmd.Env = os.Environ()
- for k, v := range taskp.env {
+ for k, v := range taskp.Env {
v = substitute(v, replacements)
cmd.Env = append(cmd.Env, k+"="+v)
}
return stdin, stdout, nil
}
+// Set up signal handlers. Go sends signal notifications to a "signal
+// channel".
func setupSignals(cmd *exec.Cmd) chan os.Signal {
- // Set up signal handlers
- // Forward SIGINT, SIGTERM and SIGQUIT to inner process
sigChan := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- catch := <-sig
- cmd.Process.Signal(catch)
- }(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGQUIT)
jobStruct Job, taskStruct Task) error {
var err error
- taskp := taskStruct.parameters
+ taskp := taskStruct.Parameters
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
- if taskStruct.sequence == 0 {
- if len(jobStruct.script_parameters.tasks) == 1 {
- taskp = jobStruct.script_parameters.tasks[0]
+ if taskStruct.Sequence == 0 {
+ if len(jobStruct.Script_parameters.Tasks) == 1 {
+ taskp = jobStruct.Script_parameters.Tasks[0]
} else {
- for _, task := range jobStruct.script_parameters.tasks {
+ for _, task := range jobStruct.Script_parameters.Tasks {
err := api.Create("job_tasks",
map[string]interface{}{
- "job_task": Task{job_uuid: jobUuid,
- created_by_job_task_uuid: taskUuid,
- sequence: 1,
- parameters: task}},
+ "job_task": Task{Job_uuid: jobUuid,
+ Created_by_job_task_uuid: taskUuid,
+ Sequence: 1,
+ Parameters: task}},
nil)
if err != nil {
return TempFail{err}
err = api.Update("job_tasks", taskUuid,
map[string]interface{}{
"job_task": Task{
- output: "",
- success: true,
- progress: 1.0}},
+ Output: "",
+ Success: true,
+ Progress: 1.0}},
nil)
return nil
}
"$(task.outdir)": outdir,
"$(task.keep)": keepmount}
+ log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+ log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+ log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
// Set up subprocess
- for k, v := range taskp.command {
- taskp.command[k] = substitute(v, replacements)
+ for k, v := range taskp.Command {
+ taskp.Command[k] = substitute(v, replacements)
}
- cmd := exec.Command(taskp.command[0], taskp.command[1:]...)
+ cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
cmd.Dir = outdir
}
log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+ var caughtSignal os.Signal
+ sigChan := setupSignals(cmd)
+
err = cmd.Start()
+ if err != nil {
+ signal.Stop(sigChan)
+ return TempFail{err}
+ }
+
+ finishedSignalNotify := make(chan struct{})
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ caughtSignal = sig
+ cmd.Process.Signal(caughtSignal)
+ }
+ close(finishedSignalNotify)
+ }(sigChan)
- signals := setupSignals(cmd)
err = cmd.Wait()
- signal.Stop(signals)
+ signal.Stop(sigChan)
+
+ close(sigChan)
+ <-finishedSignalNotify
+
+ if caughtSignal != nil {
+ log.Printf("Caught signal %v", caughtSignal)
+ return PermFail{}
+ }
if err != nil {
// Run() returns ExitError on non-zero exit code, but we handle
log.Printf("Completed with exit code %v", exitCode)
- if inCodes(exitCode, taskp.permanentFailCodes) {
+ if inCodes(exitCode, taskp.PermanentFailCodes) {
success = false
- } else if inCodes(exitCode, taskp.temporaryFailCodes) {
+ } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
- } else if inCodes(exitCode, taskp.successCodes) || cmd.ProcessState.Success() {
+ } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
success = true
} else {
success = false
err = api.Update("job_tasks", taskUuid,
map[string]interface{}{
"job_task": Task{
- output: manifest,
- success: success,
- progress: 1}},
+ Output: manifest,
+ Success: success,
+ Progress: 1}},
nil)
if err != nil {
return TempFail{err}
log.Fatal(err)
}
+ // Container may not have certificates installed, so need to look for
+ // /etc/arvados/ca-certificates.crt in addition to normal system certs.
+ var certFiles = []string{
+ "/etc/ssl/certs/ca-certificates.crt", // Debian
+ "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
+ "/etc/arvados/ca-certificates.crt",
+ }
+
+ certs := x509.NewCertPool()
+ for _, file := range certFiles {
+ data, err := ioutil.ReadFile(file)
+ if err == nil {
+ log.Printf("Using TLS certificates at %v", file)
+ certs.AppendCertsFromPEM(data)
+ }
+ }
+ api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
+
jobUuid := os.Getenv("JOB_UUID")
taskUuid := os.Getenv("TASK_UUID")
tmpdir := os.Getenv("TASK_WORK")
var kc IKeepClient
kc, err = keepclient.MakeKeepClient(&api)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ syscall.Umask(0022)
err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {