6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
19 Command []string `json:"command"`
20 Env map[string]string `json:"task.env"`
21 Stdin string `json:"task.stdin"`
22 Stdout string `json:"task.stdout"`
23 Vwd map[string]string `json:"task.vwd"`
24 SuccessCodes []int `json:"task.successCodes"`
25 PermanentFailCodes []int `json:"task.permanentFailCodes"`
26 TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
30 Tasks []TaskDef `json:"tasks"`
34 Script_parameters Tasks `json:"script_parameters"`
38 Job_uuid string `json:"job_uuid"`
39 Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
40 Parameters TaskDef `json:"parameters"`
41 Sequence int `json:"sequence"`
42 Output string `json:"output"`
43 Success bool `json:"success"`
44 Progress float32 `json:"sequence"`
47 type IArvadosClient interface {
48 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
49 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
52 func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
53 tmpdir = crunchtmpdir + "/tmpdir"
54 err = os.Mkdir(tmpdir, 0700)
59 outdir = crunchtmpdir + "/outdir"
60 err = os.Mkdir(outdir, 0700)
65 return tmpdir, outdir, nil
68 func checkOutputFilename(outdir, fn string) error {
69 if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
70 return fmt.Errorf("Path must not start or end with '/'")
72 if strings.Index("../", fn) != -1 {
73 return fmt.Errorf("Path must not contain '../'")
76 sl := strings.LastIndex(fn, "/")
78 os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
83 func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
85 for k, v := range taskp.Vwd {
86 v = substitute(v, replacements)
87 err = checkOutputFilename(outdir, k)
91 os.Symlink(v, outdir+"/"+k)
95 if taskp.Stdin != "" {
96 // Set up stdin redirection
97 stdin = substitute(taskp.Stdin, replacements)
98 cmd.Stdin, err = os.Open(stdin)
104 if taskp.Stdout != "" {
105 err = checkOutputFilename(outdir, taskp.Stdout)
109 // Set up stdout redirection
110 stdout = outdir + "/" + taskp.Stdout
111 cmd.Stdout, err = os.Create(stdout)
116 cmd.Stdout = os.Stdout
119 cmd.Stderr = os.Stderr
121 if taskp.Env != nil {
122 // Set up subprocess environment
123 cmd.Env = os.Environ()
124 for k, v := range taskp.Env {
125 v = substitute(v, replacements)
126 cmd.Env = append(cmd.Env, k+"="+v)
129 return stdin, stdout, nil
132 // Set up signal handlers. Go sends signal notifications to a "signal
134 func setupSignals(cmd *exec.Cmd) chan os.Signal {
135 sigChan := make(chan os.Signal, 1)
136 signal.Notify(sigChan, syscall.SIGTERM)
137 signal.Notify(sigChan, syscall.SIGINT)
138 signal.Notify(sigChan, syscall.SIGQUIT)
142 func inCodes(code int, codes []int) bool {
144 for _, c := range codes {
153 const TASK_TEMPFAIL = 111
155 type TempFail struct{ error }
156 type PermFail struct{}
158 func (s PermFail) Error() string {
162 func substitute(inp string, subst map[string]string) string {
163 for k, v := range subst {
164 inp = strings.Replace(inp, k, v, -1)
169 func runner(api IArvadosClient,
171 jobUuid, taskUuid, crunchtmpdir, keepmount string,
172 jobStruct Job, taskStruct Task) error {
175 taskp := taskStruct.Parameters
177 // If this is task 0 and there are multiple tasks, dispatch subtasks
179 if taskStruct.Sequence == 0 {
180 if len(jobStruct.Script_parameters.Tasks) == 1 {
181 taskp = jobStruct.Script_parameters.Tasks[0]
183 for _, task := range jobStruct.Script_parameters.Tasks {
184 err := api.Create("job_tasks",
185 map[string]interface{}{
186 "job_task": Task{Job_uuid: jobUuid,
187 Created_by_job_task_uuid: taskUuid,
195 err = api.Update("job_tasks", taskUuid,
196 map[string]interface{}{
206 var tmpdir, outdir string
207 tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
212 replacements := map[string]string{
213 "$(task.tmpdir)": tmpdir,
214 "$(task.outdir)": outdir,
215 "$(task.keep)": keepmount}
217 log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
218 log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
219 log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
222 for k, v := range taskp.Command {
223 taskp.Command[k] = substitute(v, replacements)
226 cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
230 var stdin, stdout string
231 stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
236 // Run subprocess and wait for it to complete
238 stdin = " < " + stdin
241 stdout = " > " + stdout
243 log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
245 var caughtSignal os.Signal
246 sigChan := setupSignals(cmd)
254 finishedSignalNotify := make(chan struct{})
255 go func(sig <-chan os.Signal) {
256 for sig := range sig {
258 cmd.Process.Signal(caughtSignal)
260 close(finishedSignalNotify)
267 <-finishedSignalNotify
269 if caughtSignal != nil {
270 log.Printf("Caught signal %v", caughtSignal)
275 // Run() returns ExitError on non-zero exit code, but we handle
276 // that down below. So only return if it's not ExitError.
277 if _, ok := err.(*exec.ExitError); !ok {
284 exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
286 log.Printf("Completed with exit code %v", exitCode)
288 if inCodes(exitCode, taskp.PermanentFailCodes) {
290 } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
291 return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
292 } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
298 // Upload output directory
299 manifest, err := WriteTree(kc, outdir)
305 err = api.Update("job_tasks", taskUuid,
306 map[string]interface{}{
324 api, err := arvadosclient.MakeArvadosClient()
329 // Container may not have certificates installed, so need to look for
330 // /etc/arvados/ca-certificates.crt in addition to normal system certs.
331 var certFiles = []string{
332 "/etc/ssl/certs/ca-certificates.crt", // Debian
333 "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
334 "/etc/arvados/ca-certificates.crt",
337 certs := x509.NewCertPool()
338 for _, file := range certFiles {
339 data, err := ioutil.ReadFile(file)
341 log.Printf("Using TLS certificates at %v", file)
342 certs.AppendCertsFromPEM(data)
345 api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
347 jobUuid := os.Getenv("JOB_UUID")
348 taskUuid := os.Getenv("TASK_UUID")
349 tmpdir := os.Getenv("TASK_WORK")
350 keepmount := os.Getenv("TASK_KEEPMOUNT")
355 err = api.Get("jobs", jobUuid, nil, &jobStruct)
359 err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
365 kc, err = keepclient.MakeKeepClient(&api)
371 err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
375 } else if _, ok := err.(TempFail); ok {
377 os.Exit(TASK_TEMPFAIL)
378 } else if _, ok := err.(PermFail); ok {