import (
"encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
"io/ioutil"
"log"
"os/signal"
"strings"
"syscall"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
type TaskDef struct {
}
type Job struct {
- Script_parameters Tasks `json:"script_parameters"`
+ ScriptParameters Tasks `json:"script_parameters"`
}
type Task struct {
- Job_uuid string `json:"job_uuid"`
- Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
- Parameters TaskDef `json:"parameters"`
- Sequence int `json:"sequence"`
- Output string `json:"output"`
- Success bool `json:"success"`
- Progress float32 `json:"sequence"`
+ JobUUID string `json:"job_uuid"`
+ CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
}
type IArvadosClient interface {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
tmpdir = crunchtmpdir + "/tmpdir"
err = os.Mkdir(tmpdir, 0700)
if err != nil {
func runner(api IArvadosClient,
kc IKeepClient,
- jobUuid, taskUuid, crunchtmpdir, keepmount string,
+ jobUUID, taskUUID, crunchtmpdir, keepmount string,
jobStruct Job, taskStruct Task) error {
var err error
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
if taskStruct.Sequence == 0 {
- if len(jobStruct.Script_parameters.Tasks) == 1 {
- taskp = jobStruct.Script_parameters.Tasks[0]
+ if len(jobStruct.ScriptParameters.Tasks) == 1 {
+ taskp = jobStruct.ScriptParameters.Tasks[0]
} else {
- for _, task := range jobStruct.Script_parameters.Tasks {
+ for _, task := range jobStruct.ScriptParameters.Tasks {
err := api.Create("job_tasks",
map[string]interface{}{
- "job_task": Task{Job_uuid: jobUuid,
- Created_by_job_task_uuid: taskUuid,
- Sequence: 1,
- Parameters: task}},
+ "job_task": Task{
+ JobUUID: jobUUID,
+ CreatedByJobTaskUUID: taskUUID,
+ Sequence: 1,
+ Parameters: task}},
nil)
if err != nil {
return TempFail{err}
}
}
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
"job_task": map[string]interface{}{
"output": "",
}
var tmpdir, outdir string
- tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
if err != nil {
return TempFail{err}
}
}
// Set status
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
"job_task": Task{
Output: manifest,
log.Fatal(err)
}
- jobUuid := os.Getenv("JOB_UUID")
- taskUuid := os.Getenv("TASK_UUID")
+ jobUUID := os.Getenv("JOB_UUID")
+ taskUUID := os.Getenv("TASK_UUID")
tmpdir := os.Getenv("TASK_WORK")
keepmount := os.Getenv("TASK_KEEPMOUNT")
var jobStruct Job
var taskStruct Task
- err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ err = api.Get("jobs", jobUUID, nil, &jobStruct)
if err != nil {
log.Fatal(err)
}
- err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
if err != nil {
log.Fatal(err)
}
}
syscall.Umask(0022)
- err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+ err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
os.Exit(0)
package main
import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- . "gopkg.in/check.v1"
"io"
"io/ioutil"
"log"
"syscall"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
c.Check(err, IsNil)
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{
+ Job{ScriptParameters: Tasks{[]TaskDef{
{Command: []string{"echo", "bar"}},
{Command: []string{"echo", "foo"}}}}},
Task{Parameters: TaskDef{
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: tmpfile.Name()}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "foo"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"foo\n",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "$(task.keep)"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $PATH"},
Stdout: "output.txt",
Env: map[string]string{"PATH": "foo"}}}}},
func (s *TestSuite) TestScheduleSubtask(c *C) {
api := SubtaskTestClient{c, []Task{
- {Job_uuid: "zzzz-8i9sb-111111111111111",
- Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
- Sequence: 1,
+ {JobUUID: "zzzz-8i9sb-111111111111111",
+ CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
Parameters: TaskDef{
Command: []string{"echo", "bar"}}},
- {Job_uuid: "zzzz-8i9sb-111111111111111",
- Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
- Sequence: 1,
+ {JobUUID: "zzzz-8i9sb-111111111111111",
+ CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
Parameters: TaskDef{
Command: []string{"echo", "foo"}}}},
0}
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{
+ Job{ScriptParameters: Tasks{[]TaskDef{
{Command: []string{"echo", "bar"}},
{Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
SuccessCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 0"},
PermanentFailCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
TemporaryFailCodes: []int{1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"ls", "output.txt"},
Vwd: map[string]string{
"output.txt": tmpfile.Name()}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: "$(task.keep)/file1.txt"}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat", "$(task.keep)/file1.txt"},
Stdout: "output.txt"}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"sleep", "4"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"},
Stdout: "s ub:dir/:e vi\nl"}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"},
KeepTmpOutput: true}}}},
Task{Sequence: 0})