--- /dev/null
+package main
+
+import (
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "log"
+ "os"
+ "os/exec"
+ "os/signal"
+ "strings"
+ "syscall"
+)
+
+type TaskDef struct {
+ Command []string `json:"command"`
+ Env map[string]string `json:"task.env"`
+ Stdin string `json:"task.stdin"`
+ Stdout string `json:"task.stdout"`
+ Vwd map[string]string `json:"task.vwd"`
+ SuccessCodes []int `json:"task.successCodes"`
+ PermanentFailCodes []int `json:"task.permanentFailCodes"`
+ TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+ Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+ Script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+ Job_uuid string `json:"job_uuid"`
+ Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+ Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+ Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+ tmpdir = crunchtmpdir + "/tmpdir"
+ err = os.Mkdir(tmpdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
+
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
+
+ return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+ if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+ return fmt.Errorf("Path must not start or end with '/'")
+ }
+ if strings.Index("../", fn) != -1 {
+ return fmt.Errorf("Path must not contain '../'")
+ }
+
+ sl := strings.LastIndex(fn, "/")
+ if sl != -1 {
+ os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+ }
+ return nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+ if taskp.Vwd != nil {
+ for k, v := range taskp.Vwd {
+ v = substitute(v, replacements)
+ err = checkOutputFilename(outdir, k)
+ if err != nil {
+ return "", "", err
+ }
+ os.Symlink(v, outdir+"/"+k)
+ }
+ }
+
+ if taskp.Stdin != "" {
+ // Set up stdin redirection
+ stdin = substitute(taskp.Stdin, replacements)
+ cmd.Stdin, err = os.Open(stdin)
+ if err != nil {
+ return "", "", err
+ }
+ }
+
+ if taskp.Stdout != "" {
+ err = checkOutputFilename(outdir, taskp.Stdout)
+ if err != nil {
+ return "", "", err
+ }
+ // Set up stdout redirection
+ stdout = outdir + "/" + taskp.Stdout
+ cmd.Stdout, err = os.Create(stdout)
+ if err != nil {
+ return "", "", err
+ }
+ } else {
+ cmd.Stdout = os.Stdout
+ }
+
+ if taskp.Env != nil {
+ // Set up subprocess environment
+ cmd.Env = os.Environ()
+ for k, v := range taskp.Env {
+ v = substitute(v, replacements)
+ cmd.Env = append(cmd.Env, k+"="+v)
+ }
+ }
+ return stdin, stdout, nil
+}
+
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
+ // Set up signal handlers
+ // Forward SIGINT, SIGTERM and SIGQUIT to inner process
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
+ signal.Notify(sigChan, syscall.SIGQUIT)
+ return sigChan
+}
+
+func inCodes(code int, codes []int) bool {
+ if codes != nil {
+ for _, c := range codes {
+ if code == c {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ error }
+type PermFail struct{}
+
+func (s PermFail) Error() string {
+ return "PermFail"
+}
+
+func substitute(inp string, subst map[string]string) string {
+ for k, v := range subst {
+ inp = strings.Replace(inp, k, v, -1)
+ }
+ return inp
+}
+
+func runner(api IArvadosClient,
+ kc IKeepClient,
+ jobUuid, taskUuid, crunchtmpdir, keepmount string,
+ jobStruct Job, taskStruct Task) error {
+
+ var err error
+ taskp := taskStruct.Parameters
+
+ // If this is task 0 and there are multiple tasks, dispatch subtasks
+ // and exit.
+ if taskStruct.Sequence == 0 {
+ if len(jobStruct.Script_parameters.Tasks) == 1 {
+ taskp = jobStruct.Script_parameters.Tasks[0]
+ } else {
+ for _, task := range jobStruct.Script_parameters.Tasks {
+ err := api.Create("job_tasks",
+ map[string]interface{}{
+ "job_task": Task{Job_uuid: jobUuid,
+ Created_by_job_task_uuid: taskUuid,
+ Sequence: 1,
+ Parameters: task}},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+ }
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": Task{
+ Output: "",
+ Success: true,
+ Progress: 1.0}},
+ nil)
+ return nil
+ }
+ }
+
+ var tmpdir, outdir string
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ replacements := map[string]string{
+ "$(task.tmpdir)": tmpdir,
+ "$(task.outdir)": outdir,
+ "$(task.keep)": keepmount}
+
+ // Set up subprocess
+ for k, v := range taskp.Command {
+ taskp.Command[k] = substitute(v, replacements)
+ }
+
+ cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
+ cmd.Dir = outdir
+
+ var stdin, stdout string
+ stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ if err != nil {
+ return err
+ }
+
+ // Run subprocess and wait for it to complete
+ if stdin != "" {
+ stdin = " < " + stdin
+ }
+ if stdout != "" {
+ stdout = " > " + stdout
+ }
+ log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+
+ var caughtSignal os.Signal
+ {
+ sigChan := setupSignals(cmd)
+ defer signal.Stop(sigChan)
+
+ err = cmd.Start()
+ if err != nil {
+ return TempFail{err}
+ }
+
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ caughtSignal = sig
+ cmd.Process.Signal(caughtSignal)
+ }
+ }(sigChan)
+
+ err = cmd.Wait()
+ }
+
+ if caughtSignal != nil {
+ log.Printf("Caught signal %v", caughtSignal)
+ return PermFail{}
+ }
+
+ if err != nil {
+ // Run() returns ExitError on non-zero exit code, but we handle
+ // that down below. So only return if it's not ExitError.
+ if _, ok := err.(*exec.ExitError); !ok {
+ return TempFail{err}
+ }
+ }
+
+ var success bool
+
+ exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+ log.Printf("Completed with exit code %v", exitCode)
+
+ if inCodes(exitCode, taskp.PermanentFailCodes) {
+ success = false
+ } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+ return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+ } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+ success = true
+ } else {
+ success = false
+ }
+
+ // Upload output directory
+ manifest, err := WriteTree(kc, outdir)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ // Set status
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": Task{
+ Output: manifest,
+ Success: success,
+ Progress: 1}},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ if success {
+ return nil
+ } else {
+ return PermFail{}
+ }
+}
+
+func main() {
+ api, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ jobUuid := os.Getenv("JOB_UUID")
+ taskUuid := os.Getenv("TASK_UUID")
+ tmpdir := os.Getenv("TASK_WORK")
+ keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+ var jobStruct Job
+ var taskStruct Task
+
+ err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+ err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ var kc IKeepClient
+ kc, err = keepclient.MakeKeepClient(&api)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ syscall.Umask(0022)
+ err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+ if err == nil {
+ os.Exit(0)
+ } else if _, ok := err.(TempFail); ok {
+ log.Print(err)
+ os.Exit(TASK_TEMPFAIL)
+ } else if _, ok := err.(PermFail); ok {
+ os.Exit(1)
+ } else {
+ log.Fatal(err)
+ }
+}
--- /dev/null
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ . "gopkg.in/check.v1"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "syscall"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+ c *C
+ manifest string
+ success bool
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+ Output: t.manifest,
+ Success: t.success,
+ Progress: 1}})
+ return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"echo", "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
+
+func checkOutput(c *C, tmpdir string) {
+ file, err := os.Open(tmpdir + "/outdir/output.txt")
+ c.Assert(err, IsNil)
+
+ data := make([]byte, 100)
+ var count int
+ err = nil
+ offset := 0
+ for err == nil {
+ count, err = file.Read(data[offset:])
+ offset += count
+ }
+ c.Assert(err, Equals, io.EOF)
+ c.Check(string(data[0:offset]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{
+ TaskDef{Command: []string{"echo", "bar"}},
+ TaskDef{Command: []string{"echo", "foo"}}}}},
+ Task{Parameters: TaskDef{
+ Command: []string{"echo", "foo"},
+ Stdout: "output.txt"},
+ Sequence: 1})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat"},
+ Stdout: "output.txt",
+ Stdin: tmpfile.Name()}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $BAR"},
+ Stdout: "output.txt",
+ Env: map[string]string{"BAR": "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvSubstitute(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "foo\n",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $BAR"},
+ Stdout: "output.txt",
+ Env: map[string]string{"BAR": "$(task.keep)"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvReplace(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $PATH"},
+ Stdout: "output.txt",
+ Env: map[string]string{"PATH": "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+ c *C
+ parms []Task
+ i int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+ t.i += 1
+ return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+ api := SubtaskTestClient{c, []Task{
+ Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
+ Parameters: TaskDef{
+ Command: []string{"echo", "bar"}}},
+ Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
+ Parameters: TaskDef{
+ Command: []string{"echo", "foo"}}}},
+ 0}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(&api, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{
+ TaskDef{Command: []string{"echo", "bar"}},
+ TaskDef{Command: []string{"echo", "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+}
+
+func (s *TestSuite) TestRunFail(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"},
+ SuccessCodes: []int{0, 1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 0"},
+ PermanentFailCodes: []int{0, 1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"},
+ TemporaryFailCodes: []int{1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"ls", "output.txt"},
+ Vwd: map[string]string{
+ "output.txt": tmpfile.Name()}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionStdin(c *C) {
+ keepmount, _ := ioutil.TempDir("", "")
+ ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+ defer func() {
+ os.RemoveAll(keepmount)
+ }()
+
+ log.Print("Keepmount is ", keepmount)
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ log.Print("tmpdir is ", tmpdir)
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ keepmount,
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat"},
+ Stdout: "output.txt",
+ Stdin: "$(task.keep)/file1.txt"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
+ keepmount, _ := ioutil.TempDir("", "")
+ ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+ defer func() {
+ os.RemoveAll(keepmount)
+ }()
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ keepmount,
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat", "$(task.keep)/file1.txt"},
+ Stdout: "output.txt"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSignal(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ go func() {
+ time.Sleep(1 * time.Second)
+ self, _ := os.FindProcess(os.Getpid())
+ self.Signal(syscall.SIGINT)
+ }()
+
+ err := runner(ArvTestClient{c,
+ "", false},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"sleep", "4"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+
+}
+
+func (s *TestSuite) TestQuoting(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ "./s\\040ub:dir d3b07384d113edec49eaa6238ad5ff00+4 0:4::e\\040vil\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"echo", "foo"},
+ Stdout: "s ub:dir/:e vi\nl"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+)
+
+type Block struct {
+ data []byte
+ offset int64
+}
+
+type ManifestStreamWriter struct {
+ *ManifestWriter
+ *manifest.ManifestStream
+ offset int64
+ *Block
+ uploader chan *Block
+ finish chan []error
+}
+
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+ var total int64
+ var count int
+
+ for err == nil {
+ if m.Block == nil {
+ m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+ }
+ count, err = r.Read(m.Block.data[m.Block.offset:])
+ total += int64(count)
+ m.Block.offset += int64(count)
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
+ }
+ }
+
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for block := range uploader {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
+ }
+ finish <- errors
+}
+
+type ManifestWriter struct {
+ IKeepClient
+ stripPrefix string
+ Streams map[string]*ManifestStreamWriter
+}
+
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
+ if info.IsDir() {
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.Streams[dir] == nil {
+ m.Streams[dir] = &ManifestStreamWriter{
+ m,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error)}
+ go m.Streams[dir].goUpload()
+ }
+
+ stream := m.Streams[dir]
+
+ fileStart := stream.offset
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+
+ log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+ var count int64
+ count, err = io.Copy(stream, file)
+ if err != nil {
+ return err
+ }
+
+ stream.offset += count
+
+ stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+ fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+ return nil
+}
+
+func (m *ManifestWriter) Finish() error {
+ var errstring string
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
+ }
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
+}
+
+func (m *ManifestWriter) ManifestText() string {
+ m.Finish()
+ var buf bytes.Buffer
+
+ dirs := make([]string, len(m.Streams))
+ i := 0
+ for k := range m.Streams {
+ dirs[i] = k
+ i++
+ }
+ sort.Strings(dirs)
+
+ for _, k := range dirs {
+ v := m.Streams[k]
+
+ if k == "." {
+ buf.WriteString(".")
+ } else {
+ k = strings.Replace(k, " ", "\\040", -1)
+ k = strings.Replace(k, "\n", "", -1)
+ buf.WriteString("./" + k)
+ }
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ for _, f := range v.Files {
+ buf.WriteString(" ")
+ f = strings.Replace(f, " ", "\\040", -1)
+ f = strings.Replace(f, "\n", "", -1)
+ buf.WriteString(f)
+ }
+ buf.WriteString("\n")
+ }
+ return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+ mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+ err = filepath.Walk(root, mw.WalkFunc)
+
+ if err != nil {
+ return "", err
+ }
+
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
+}
--- /dev/null
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ . "gopkg.in/check.v1"
+ "io/ioutil"
+ "os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ file, _ := os.Create(tmpdir + "/" + "file1.txt")
+ data := make([]byte, 1024*1024-1)
+ for i := range data {
+ data[i] = byte(i % 10)
+ }
+ for i := 0; i < 65; i++ {
+ file.Write(data)
+ }
+ file.Close()
+
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+ c.Check(err, NotNil)
+ c.Check(str, Equals, "")
+}