Merge branch 'master' into 7492-keepproxy-upstream-errors
authorradhika <radhika@curoverse.com>
Tue, 27 Oct 2015 18:11:13 +0000 (14:11 -0400)
committerradhika <radhika@curoverse.com>
Tue, 27 Oct 2015 18:11:13 +0000 (14:11 -0400)
12 files changed:
doc/_config.yml
doc/install/configure-azure-blob-storage.html.textile.liquid [new file with mode: 0644]
doc/install/install-keepstore.html.textile.liquid
sdk/go/crunchrunner/crunchrunner.go [new file with mode: 0644]
sdk/go/crunchrunner/crunchrunner_test.go [new file with mode: 0644]
sdk/go/crunchrunner/upload.go [new file with mode: 0644]
sdk/go/crunchrunner/upload_test.go [new file with mode: 0644]
sdk/python/arvados/__init__.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/test_api.py
sdk/python/tests/test_retry_job_helpers.py [new file with mode: 0644]
services/arv-git-httpd/git_handler_test.go

index 1bdd2ab4461c244b4ac2a2ee5eb725694c67ad62..75cb997d78b0fe0d80d869023185a11dbfa0bfbd 100644 (file)
@@ -153,6 +153,7 @@ navbar:
       - install/install-shell-server.html.textile.liquid
       - install/create-standard-objects.html.textile.liquid
       - install/install-keepstore.html.textile.liquid
+      - install/configure-azure-blob-storage.html.textile.liquid
       - install/install-keepproxy.html.textile.liquid
       - install/install-crunch-dispatch.html.textile.liquid
       - install/install-compute-node.html.textile.liquid
diff --git a/doc/install/configure-azure-blob-storage.html.textile.liquid b/doc/install/configure-azure-blob-storage.html.textile.liquid
new file mode 100644 (file)
index 0000000..e365d74
--- /dev/null
@@ -0,0 +1,62 @@
+---
+layout: default
+navsection: installguide
+title: Configure Azure Blob storage
+...
+
+As an alternative to local and network-attached POSIX filesystems, Keepstore can store data in an Azure Storage container.
+
+h2. Create a container
+
+Normally, all keepstore services are configured to share a single Azure Storage container.
+
+Using the Azure web portal or command line tool, create or choose a storage account with a suitable redundancy profile and availability region. Use the storage account keys to create a new container.
+
+<notextile>
+<pre><code>~$ <span class="userinput">azure config mode arm</span>
+~$ <span class="userinput">azure login</span>
+~$ <span class="userinput">azure group create exampleGroupName eastus</span>
+~$ <span class="userinput">azure storage account create --type LRS --location eastus --resource-group exampleGroupName exampleStorageAccountName</span>
+~$ <span class="userinput">azure storage account keys list --resource-group exampleGroupName exampleStorageAccountName</span>
+info:    Executing command storage account keys list
++ Getting storage account keys
+data:    Primary: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+data:    Secondary: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy==
+info:    storage account keys list command OK
+~$ <span class="userinput">AZURE_STORAGE_ACCOUNT="exampleStorageAccountName" \
+AZURE_STORAGE_ACCESS_KEY="zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==" \
+azure storage container create exampleContainerName</span>
+</code></pre>
+</notextile>
+
+h2. Configure keepstore
+
+Copy the primary storage account key to a file where it will be accessible to keepstore at startup time.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sh -c 'cat &gt;/etc/sv/keepstore/exampleStorageAccountName.key &lt;&lt;EOF'
+zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+EOF</span>
+~$ <span class="userinput">sudo chmod 0400 /etc/sv/keepstore/exampleStorageAccountName.key</span>
+</code></pre>
+</notextile>
+
+In your keepstore startup script, instead of specifying a local storage using @-volume /path@ or discovering mount points automatically, use @-azure-*@ arguments to specify the storage container:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2&gt;&amp;1
+exec keepstore \
+ -azure-storage-account-key-file <span class="userinput">/etc/sv/keepstore/exampleStorageAccountName.key</span> \
+ -azure-storage-account-name <span class="userinput">exampleStorageAccountName</span> \
+ -azure-storage-container-volume <span class="userinput">exampleContainerName</span>
+</code></pre>
+</notextile>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
+
+<notextile>
+<pre><code>2015/10/26 21:06:24 Using volume azure-storage-container:"exampleContainerName" (writable=true)
+</code></pre>
+</notextile>
index 4cb46e13801e78a95b51c26cce0f3696cf8b3f78..efeff65b83dce3c774bffec219821a57f78e6175 100644 (file)
@@ -37,6 +37,10 @@ Verify that Keepstore is functional:
 <pre><code>~$ <span class="userinput">keepstore -h</span>
 2015/05/08 13:41:16 keepstore starting, pid 2565
 Usage of ./keepstore:
+  -azure-storage-account-key-file="": File containing the account key used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
+  -azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
   -blob-signature-ttl=1209600: Lifetime of blob permission signatures. See services/api/config/application.default.yml.
   -blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
   -data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
@@ -54,23 +58,67 @@ Usage of ./keepstore:
 </code></pre>
 </notextile>
 
-If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server":install-api-server.html config/application.yml file.
+h3. Prepare storage volumes
 
-The @-max-buffers@ argument can be used to restrict keepstore's memory use. By default, keepstore will allocate no more than 128 blocks (8 GiB) worth of data buffers at a time. Normally this should be set as high as possible without risking swapping.
+{% include 'notebox_begin' %}
+This section uses a local filesystem as a backing store. If you are using Azure Storage, follow the setup instructions on the "Azure Blob Storage":configure-azure-blob-storage.html page instead.
+{% include 'notebox_end' %}
 
-Prepare one or more volumes for Keepstore to use. Simply create a /keep directory on all the partitions you would like Keepstore to use, and then start Keepstore. For example, using 2 tmpfs volumes:
+There are two ways to specify a set of local directories where keepstore should store its data files.
+# Implicitly, by creating a directory called @keep@ at the top level of each filesystem you intend to use, and omitting @-volume@ arguments.
+# Explicitly, by providing a @-volume@ argument for each directory.
+
+For example, if there are filesystems mounted at @/mnt@ and @/mnt2@:
 
 <notextile>
-<pre><code>~$ <span class="userinput">keepstore -blob-signing-key-file=./blob-signing-key</span>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore</span>
 2015/05/08 13:44:26 keepstore starting, pid 2765
 2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
 2015/05/08 13:44:26 listening at :25107
 </code></pre>
 </notextile>
 
-It's recommended to run Keepstore under "runit":http://smarden.org/runit/ or something similar.
+Equivalently:
 
-Repeat this section for each Keepstore server you are setting up.
+<notextile>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore -volume=/mnt/keep -volume=/mnt2/keep</span>
+2015/05/08 13:44:26 keepstore starting, pid 2765
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
+2015/05/08 13:44:26 listening at :25107
+</code></pre>
+</notextile>
+
+h3. Run keepstore as a supervised service
+
+We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2>&1
+exec GOGC=10 GOMAXPROCS=<span class="userinput">4</span> keepstore \
+ -enforce-permissions=true \
+ -blob-signing-key-file=<span class="userinput">/etc/keepstore/blob-signing.key</span> \
+ -max-buffers=<span class="userinput">100</span> \
+ -serialize=true \
+ -volume=<span class="userinput">/mnt/keep</span> \
+ -volume=<span class="userinput">/mnt2/keep</span>
+</code></pre>
+</notextile>
+
+The @GOMAXPROCS@ environment variable determines the maximum number of concurrent threads, and should normally be set to the number of CPU cores present.
+
+The @-max-buffers@ argument limits keepstore's memory usage. It should be set such that @max-buffers * 64MiB + 10%@ fits comfortably in memory. For example, @-max-buffers=100@ is suitable for a host with 8 GiB RAM.
+
+If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server's":install-api-server.html configuration file, @/etc/arvados/api/application.yml@.
+
+h3. Set up additional servers
+
+Repeat the above sections to prepare volumes and bring up supervised services on each Keepstore server you are setting up.
 
 h3. Tell the API server about the Keepstore servers
 
@@ -90,6 +138,3 @@ Make sure to update the @service_host@ value to match each of your Keepstore ser
 }
 EOF</span>
 </code></pre></notextile>
-
-
-
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
new file mode 100644 (file)
index 0000000..8e24e18
--- /dev/null
@@ -0,0 +1,356 @@
+package main
+
+import (
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "log"
+       "os"
+       "os/exec"
+       "os/signal"
+       "strings"
+       "syscall"
+)
+
+type TaskDef struct {
+       Command            []string          `json:"command"`
+       Env                map[string]string `json:"task.env"`
+       Stdin              string            `json:"task.stdin"`
+       Stdout             string            `json:"task.stdout"`
+       Vwd                map[string]string `json:"task.vwd"`
+       SuccessCodes       []int             `json:"task.successCodes"`
+       PermanentFailCodes []int             `json:"task.permanentFailCodes"`
+       TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+       Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+       Script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+       Job_uuid                 string  `json:"job_uuid"`
+       Created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
+       Parameters               TaskDef `json:"parameters"`
+       Sequence                 int     `json:"sequence"`
+       Output                   string  `json:"output"`
+       Success                  bool    `json:"success"`
+       Progress                 float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+       Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+       tmpdir = crunchtmpdir + "/tmpdir"
+       err = os.Mkdir(tmpdir, 0700)
+       if err != nil {
+               return "", "", err
+       }
+
+       outdir = crunchtmpdir + "/outdir"
+       err = os.Mkdir(outdir, 0700)
+       if err != nil {
+               return "", "", err
+       }
+
+       return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+       if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+               return fmt.Errorf("Path must not start or end with '/'")
+       }
+       if strings.Index("../", fn) != -1 {
+               return fmt.Errorf("Path must not contain '../'")
+       }
+
+       sl := strings.LastIndex(fn, "/")
+       if sl != -1 {
+               os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+       }
+       return nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+       if taskp.Vwd != nil {
+               for k, v := range taskp.Vwd {
+                       v = substitute(v, replacements)
+                       err = checkOutputFilename(outdir, k)
+                       if err != nil {
+                               return "", "", err
+                       }
+                       os.Symlink(v, outdir+"/"+k)
+               }
+       }
+
+       if taskp.Stdin != "" {
+               // Set up stdin redirection
+               stdin = substitute(taskp.Stdin, replacements)
+               cmd.Stdin, err = os.Open(stdin)
+               if err != nil {
+                       return "", "", err
+               }
+       }
+
+       if taskp.Stdout != "" {
+               err = checkOutputFilename(outdir, taskp.Stdout)
+               if err != nil {
+                       return "", "", err
+               }
+               // Set up stdout redirection
+               stdout = outdir + "/" + taskp.Stdout
+               cmd.Stdout, err = os.Create(stdout)
+               if err != nil {
+                       return "", "", err
+               }
+       } else {
+               cmd.Stdout = os.Stdout
+       }
+
+       if taskp.Env != nil {
+               // Set up subprocess environment
+               cmd.Env = os.Environ()
+               for k, v := range taskp.Env {
+                       v = substitute(v, replacements)
+                       cmd.Env = append(cmd.Env, k+"="+v)
+               }
+       }
+       return stdin, stdout, nil
+}
+
+// Set up signal handlers.  Go sends signal notifications to a "signal
+// channel".
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGTERM)
+       signal.Notify(sigChan, syscall.SIGINT)
+       signal.Notify(sigChan, syscall.SIGQUIT)
+       return sigChan
+}
+
+func inCodes(code int, codes []int) bool {
+       if codes != nil {
+               for _, c := range codes {
+                       if code == c {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ error }
+type PermFail struct{}
+
+func (s PermFail) Error() string {
+       return "PermFail"
+}
+
+func substitute(inp string, subst map[string]string) string {
+       for k, v := range subst {
+               inp = strings.Replace(inp, k, v, -1)
+       }
+       return inp
+}
+
+func runner(api IArvadosClient,
+       kc IKeepClient,
+       jobUuid, taskUuid, crunchtmpdir, keepmount string,
+       jobStruct Job, taskStruct Task) error {
+
+       var err error
+       taskp := taskStruct.Parameters
+
+       // If this is task 0 and there are multiple tasks, dispatch subtasks
+       // and exit.
+       if taskStruct.Sequence == 0 {
+               if len(jobStruct.Script_parameters.Tasks) == 1 {
+                       taskp = jobStruct.Script_parameters.Tasks[0]
+               } else {
+                       for _, task := range jobStruct.Script_parameters.Tasks {
+                               err := api.Create("job_tasks",
+                                       map[string]interface{}{
+                                               "job_task": Task{Job_uuid: jobUuid,
+                                                       Created_by_job_task_uuid: taskUuid,
+                                                       Sequence:                 1,
+                                                       Parameters:               task}},
+                                       nil)
+                               if err != nil {
+                                       return TempFail{err}
+                               }
+                       }
+                       err = api.Update("job_tasks", taskUuid,
+                               map[string]interface{}{
+                                       "job_task": Task{
+                                               Output:   "",
+                                               Success:  true,
+                                               Progress: 1.0}},
+                               nil)
+                       return nil
+               }
+       }
+
+       var tmpdir, outdir string
+       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       replacements := map[string]string{
+               "$(task.tmpdir)": tmpdir,
+               "$(task.outdir)": outdir,
+               "$(task.keep)":   keepmount}
+
+       // Set up subprocess
+       for k, v := range taskp.Command {
+               taskp.Command[k] = substitute(v, replacements)
+       }
+
+       cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
+       cmd.Dir = outdir
+
+       var stdin, stdout string
+       stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+       if err != nil {
+               return err
+       }
+
+       // Run subprocess and wait for it to complete
+       if stdin != "" {
+               stdin = " < " + stdin
+       }
+       if stdout != "" {
+               stdout = " > " + stdout
+       }
+       log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+
+       var caughtSignal os.Signal
+       sigChan := setupSignals(cmd)
+
+       err = cmd.Start()
+       if err != nil {
+               signal.Stop(sigChan)
+               return TempFail{err}
+       }
+
+       finishedSignalNotify := make(chan struct{})
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       caughtSignal = sig
+                       cmd.Process.Signal(caughtSignal)
+               }
+               close(finishedSignalNotify)
+       }(sigChan)
+
+       err = cmd.Wait()
+       signal.Stop(sigChan)
+
+       close(sigChan)
+       <-finishedSignalNotify
+
+       if caughtSignal != nil {
+               log.Printf("Caught signal %v", caughtSignal)
+               return PermFail{}
+       }
+
+       if err != nil {
+               // Run() returns ExitError on non-zero exit code, but we handle
+               // that down below.  So only return if it's not ExitError.
+               if _, ok := err.(*exec.ExitError); !ok {
+                       return TempFail{err}
+               }
+       }
+
+       var success bool
+
+       exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+       log.Printf("Completed with exit code %v", exitCode)
+
+       if inCodes(exitCode, taskp.PermanentFailCodes) {
+               success = false
+       } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+               return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+       } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+               success = true
+       } else {
+               success = false
+       }
+
+       // Upload output directory
+       manifest, err := WriteTree(kc, outdir)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       // Set status
+       err = api.Update("job_tasks", taskUuid,
+               map[string]interface{}{
+                       "job_task": Task{
+                               Output:   manifest,
+                               Success:  success,
+                               Progress: 1}},
+               nil)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       if success {
+               return nil
+       } else {
+               return PermFail{}
+       }
+}
+
+func main() {
+       api, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       jobUuid := os.Getenv("JOB_UUID")
+       taskUuid := os.Getenv("TASK_UUID")
+       tmpdir := os.Getenv("TASK_WORK")
+       keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+       var jobStruct Job
+       var taskStruct Task
+
+       err = api.Get("jobs", jobUuid, nil, &jobStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+       err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       var kc IKeepClient
+       kc, err = keepclient.MakeKeepClient(&api)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       syscall.Umask(0022)
+       err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+       if err == nil {
+               os.Exit(0)
+       } else if _, ok := err.(TempFail); ok {
+               log.Print(err)
+               os.Exit(TASK_TEMPFAIL)
+       } else if _, ok := err.(PermFail); ok {
+               os.Exit(1)
+       } else {
+               log.Fatal(err)
+       }
+}
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
new file mode 100644 (file)
index 0000000..52d5c1a
--- /dev/null
@@ -0,0 +1,445 @@
+package main
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       . "gopkg.in/check.v1"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "syscall"
+       "testing"
+       "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+       c        *C
+       manifest string
+       success  bool
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+       return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+       t.c.Check(resourceType, Equals, "job_tasks")
+       t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+               Output:   t.manifest,
+               Success:  t.success,
+               Progress: 1}})
+       return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"echo", "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
+
+func checkOutput(c *C, tmpdir string) {
+       file, err := os.Open(tmpdir + "/outdir/output.txt")
+       c.Assert(err, IsNil)
+
+       data := make([]byte, 100)
+       var count int
+       err = nil
+       offset := 0
+       for err == nil {
+               count, err = file.Read(data[offset:])
+               offset += count
+       }
+       c.Assert(err, Equals, io.EOF)
+       c.Check(string(data[0:offset]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{
+                       TaskDef{Command: []string{"echo", "bar"}},
+                       TaskDef{Command: []string{"echo", "foo"}}}}},
+               Task{Parameters: TaskDef{
+                       Command: []string{"echo", "foo"},
+                       Stdout:  "output.txt"},
+                       Sequence: 1})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+       tmpfile, _ := ioutil.TempFile("", "")
+       tmpfile.Write([]byte("foo\n"))
+       tmpfile.Close()
+       defer os.Remove(tmpfile.Name())
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat"},
+                       Stdout:  "output.txt",
+                       Stdin:   tmpfile.Name()}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $BAR"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"BAR": "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvSubstitute(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "foo\n",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $BAR"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"BAR": "$(task.keep)"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvReplace(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $PATH"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"PATH": "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+       c     *C
+       parms []Task
+       i     int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+       t.c.Check(resourceType, Equals, "job_tasks")
+       t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+       t.i += 1
+       return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+       return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+       api := SubtaskTestClient{c, []Task{
+               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+                       Sequence:                 1,
+                       Parameters: TaskDef{
+                               Command: []string{"echo", "bar"}}},
+               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+                       Sequence:                 1,
+                       Parameters: TaskDef{
+                               Command: []string{"echo", "foo"}}}},
+               0}
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(&api, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{
+                       TaskDef{Command: []string{"echo", "bar"}},
+                       TaskDef{Command: []string{"echo", "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+}
+
+func (s *TestSuite) TestRunFail(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:      []string{"/bin/sh", "-c", "exit 1"},
+                       SuccessCodes: []int{0, 1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:            []string{"/bin/sh", "-c", "exit 0"},
+                       PermanentFailCodes: []int{0, 1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:            []string{"/bin/sh", "-c", "exit 1"},
+                       TemporaryFailCodes: []int{1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+       tmpfile, _ := ioutil.TempFile("", "")
+       tmpfile.Write([]byte("foo\n"))
+       tmpfile.Close()
+       defer os.Remove(tmpfile.Name())
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"ls", "output.txt"},
+                       Vwd: map[string]string{
+                               "output.txt": tmpfile.Name()}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionStdin(c *C) {
+       keepmount, _ := ioutil.TempDir("", "")
+       ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+       defer func() {
+               os.RemoveAll(keepmount)
+       }()
+
+       log.Print("Keepmount is ", keepmount)
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       log.Print("tmpdir is ", tmpdir)
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               keepmount,
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat"},
+                       Stdout:  "output.txt",
+                       Stdin:   "$(task.keep)/file1.txt"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
+       keepmount, _ := ioutil.TempDir("", "")
+       ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+       defer func() {
+               os.RemoveAll(keepmount)
+       }()
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               keepmount,
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat", "$(task.keep)/file1.txt"},
+                       Stdout:  "output.txt"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSignal(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       go func() {
+               time.Sleep(1 * time.Second)
+               self, _ := os.FindProcess(os.Getpid())
+               self.Signal(syscall.SIGINT)
+       }()
+
+       err := runner(ArvTestClient{c,
+               "", false},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"sleep", "4"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+
+}
+
+func (s *TestSuite) TestQuoting(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               "./s\\040ub:dir d3b07384d113edec49eaa6238ad5ff00+4 0:4::e\\040vil\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"echo", "foo"},
+                       Stdout:  "s ub:dir/:e vi\nl"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
new file mode 100644 (file)
index 0000000..4ced0ce
--- /dev/null
@@ -0,0 +1,217 @@
+package main
+
+import (
+       "bytes"
+       "crypto/md5"
+       "errors"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "io"
+       "log"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+)
+
+type Block struct {
+       data   []byte
+       offset int64
+}
+
+type ManifestStreamWriter struct {
+       *ManifestWriter
+       *manifest.ManifestStream
+       offset int64
+       *Block
+       uploader chan *Block
+       finish   chan []error
+}
+
+type IKeepClient interface {
+       PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+       n, err := m.ReadFrom(bytes.NewReader(p))
+       return int(n), err
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+       var total int64
+       var count int
+
+       for err == nil {
+               if m.Block == nil {
+                       m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+               }
+               count, err = r.Read(m.Block.data[m.Block.offset:])
+               total += int64(count)
+               m.Block.offset += int64(count)
+               if m.Block.offset == keepclient.BLOCKSIZE {
+                       m.uploader <- m.Block
+                       m.Block = nil
+               }
+       }
+
+       if err == io.EOF {
+               return total, nil
+       } else {
+               return total, err
+       }
+
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+       var errors []error
+       uploader := m.uploader
+       finish := m.finish
+       for block := range uploader {
+               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+               signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+               if err != nil {
+                       errors = append(errors, err)
+               } else {
+                       m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+               }
+       }
+       finish <- errors
+}
+
+type ManifestWriter struct {
+       IKeepClient
+       stripPrefix string
+       Streams     map[string]*ManifestStreamWriter
+}
+
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
+       if info.IsDir() {
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       }
+       if dir == "" {
+               dir = "."
+       }
+
+       fn := path[(len(path) - len(info.Name())):]
+
+       if m.Streams[dir] == nil {
+               m.Streams[dir] = &ManifestStreamWriter{
+                       m,
+                       &manifest.ManifestStream{StreamName: dir},
+                       0,
+                       nil,
+                       make(chan *Block),
+                       make(chan []error)}
+               go m.Streams[dir].goUpload()
+       }
+
+       stream := m.Streams[dir]
+
+       fileStart := stream.offset
+
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+
+       log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+       var count int64
+       count, err = io.Copy(stream, file)
+       if err != nil {
+               return err
+       }
+
+       stream.offset += count
+
+       stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+               fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+       return nil
+}
+
+func (m *ManifestWriter) Finish() error {
+       var errstring string
+       for _, stream := range m.Streams {
+               if stream.uploader == nil {
+                       continue
+               }
+               if stream.Block != nil {
+                       stream.uploader <- stream.Block
+               }
+               close(stream.uploader)
+               stream.uploader = nil
+
+               errors := <-stream.finish
+               close(stream.finish)
+               stream.finish = nil
+
+               for _, r := range errors {
+                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+               }
+       }
+       if errstring != "" {
+               return errors.New(errstring)
+       } else {
+               return nil
+       }
+}
+
+func (m *ManifestWriter) ManifestText() string {
+       m.Finish()
+       var buf bytes.Buffer
+
+       dirs := make([]string, len(m.Streams))
+       i := 0
+       for k := range m.Streams {
+               dirs[i] = k
+               i++
+       }
+       sort.Strings(dirs)
+
+       for _, k := range dirs {
+               v := m.Streams[k]
+
+               if k == "." {
+                       buf.WriteString(".")
+               } else {
+                       k = strings.Replace(k, " ", "\\040", -1)
+                       k = strings.Replace(k, "\n", "", -1)
+                       buf.WriteString("./" + k)
+               }
+               for _, b := range v.Blocks {
+                       buf.WriteString(" ")
+                       buf.WriteString(b)
+               }
+               for _, f := range v.Files {
+                       buf.WriteString(" ")
+                       f = strings.Replace(f, " ", "\\040", -1)
+                       f = strings.Replace(f, "\n", "", -1)
+                       buf.WriteString(f)
+               }
+               buf.WriteString("\n")
+       }
+       return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+       mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+       err = filepath.Walk(root, mw.WalkFunc)
+
+       if err != nil {
+               return "", err
+       }
+
+       err = mw.Finish()
+       if err != nil {
+               return "", err
+       }
+
+       return mw.ManifestText(), nil
+}
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
new file mode 100644 (file)
index 0000000..a2bf0ac
--- /dev/null
@@ -0,0 +1,140 @@
+package main
+
+import (
+       "crypto/md5"
+       "errors"
+       "fmt"
+       . "gopkg.in/check.v1"
+       "io/ioutil"
+       "os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+       return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       file, _ := os.Create(tmpdir + "/" + "file1.txt")
+       data := make([]byte, 1024*1024-1)
+       for i := range data {
+               data[i] = byte(i % 10)
+       }
+       for i := 0; i < 65; i++ {
+               file.Write(data)
+       }
+       file.Close()
+
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+       return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+       c.Check(err, NotNil)
+       c.Check(str, Equals, "")
+}
index 1df64703a9577d7338f60f5dd944b1c72d854064..b74f828f4bd04a2a6321aa50e5f823cb3a2496ab 100644 (file)
@@ -23,6 +23,7 @@ from collection import CollectionReader, CollectionWriter, ResumableCollectionWr
 from keep import *
 from stream import *
 from arvfile import StreamFileReader
+from retry import RetryLoop
 import errors
 import util
 
@@ -37,36 +38,60 @@ logger.addHandler(log_handler)
 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
                 else logging.WARNING)
 
-def task_set_output(self,s):
-    api('v1').job_tasks().update(uuid=self['uuid'],
-                                 body={
-            'output':s,
-            'success':True,
-            'progress':1.0
-            }).execute()
+def task_set_output(self, s, num_retries=5):
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
+        try:
+            return api('v1').job_tasks().update(
+                uuid=self['uuid'],
+                body={
+                    'output':s,
+                    'success':True,
+                    'progress':1.0
+                }).execute()
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 _current_task = None
-def current_task():
+def current_task(num_retries=5):
     global _current_task
     if _current_task:
         return _current_task
-    t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
-    t = UserDict.UserDict(t)
-    t.set_output = types.MethodType(task_set_output, t)
-    t.tmpdir = os.environ['TASK_WORK']
-    _current_task = t
-    return t
+
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+        try:
+            task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+            task = UserDict.UserDict(task)
+            task.set_output = types.MethodType(task_set_output, task)
+            task.tmpdir = os.environ['TASK_WORK']
+            _current_task = task
+            return task
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 _current_job = None
-def current_job():
+def current_job(num_retries=5):
     global _current_job
     if _current_job:
         return _current_job
-    t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
-    t = UserDict.UserDict(t)
-    t.tmpdir = os.environ['JOB_WORK']
-    _current_job = t
-    return t
+
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+        try:
+            job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
+            job = UserDict.UserDict(job)
+            job.tmpdir = os.environ['JOB_WORK']
+            _current_job = job
+            return job
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 def getjobparam(*args):
     return current_job()['script_parameters'].get(*args)
index 6e2a07888662172fd6f26b335a0206db4ef15e98..ea318c6de05c9624d8517d15e8c4071f287a4ab9 100644 (file)
@@ -43,6 +43,10 @@ def mock_responses(body, *codes, **headers):
     return mock.patch('httplib2.Http.request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
 
+def mock_api_responses(api_client, body, codes, headers={}):
+    return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
+        (fake_httplib2_response(code, **headers), body) for code in codes)))
+
 
 class FakeCurl:
     @classmethod
index 9d438e2e038ecca70225009d261e5229d61a81c3..6d1e9798cfbecf72cc23e30e33bc590274bcec6c 100644 (file)
@@ -6,60 +6,32 @@ import httplib2
 import json
 import mimetypes
 import os
-import run_test_server
+import socket
 import string
 import unittest
+
+import mock
+import run_test_server
+
 from apiclient import errors as apiclient_errors
 from apiclient import http as apiclient_http
 from arvados.api import OrderedJsonModel
-
 from arvados_testutil import fake_httplib2_response
 
 if not mimetypes.inited:
     mimetypes.init()
 
-class ArvadosApiClientTest(unittest.TestCase):
+class ArvadosApiTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
     ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
 
-    @classmethod
-    def api_error_response(cls, code, *errors):
-        return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
+    def api_error_response(self, code, *errors):
+        return (fake_httplib2_response(code, **self.ERROR_HEADERS),
                 json.dumps({'errors': errors,
                             'error_token': '1234567890+12345678'}))
 
-    @classmethod
-    def setUpClass(cls):
-        # The apiclient library has support for mocking requests for
-        # testing, but it doesn't extend to the discovery document
-        # itself. For now, bring up an API server that will serve
-        # a discovery document.
-        # FIXME: Figure out a better way to stub this out.
-        run_test_server.run()
-        mock_responses = {
-            'arvados.humans.delete': (
-                fake_httplib2_response(500, **cls.ERROR_HEADERS),
-                ""),
-            'arvados.humans.get': cls.api_error_response(
-                422, "Bad UUID format", "Bad output format"),
-            'arvados.humans.list': (None, json.dumps(
-                    {'items_available': 0, 'items': []})),
-            }
-        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
-        cls.api = arvados.api('v1',
-                              host=os.environ['ARVADOS_API_HOST'],
-                              token='discovery-doc-only-no-token-needed',
-                              insecure=True,
-                              requestBuilder=req_builder)
-
-    def tearDown(cls):
-        run_test_server.reset()
-
     def test_new_api_objects_with_cache(self):
-        clients = [arvados.api('v1', cache=True,
-                               host=os.environ['ARVADOS_API_HOST'],
-                               token='discovery-doc-only-no-token-needed',
-                               insecure=True)
-                   for index in [0, 1]]
+        clients = [arvados.api('v1', cache=True) for index in [0, 1]]
         self.assertIsNot(*clients)
 
     def test_empty_list(self):
@@ -92,15 +64,28 @@ class ArvadosApiClientTest(unittest.TestCase):
                     new_item['created_at']))
 
     def test_exceptions_include_errors(self):
+        mock_responses = {
+            'arvados.humans.get': self.api_error_response(
+                422, "Bad UUID format", "Bad output format"),
+            }
+        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+        api = arvados.api('v1', requestBuilder=req_builder)
         with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
-            self.api.humans().get(uuid='xyz-xyz-abcdef').execute()
+            api.humans().get(uuid='xyz-xyz-abcdef').execute()
         err_s = str(err_ctx.exception)
         for msg in ["Bad UUID format", "Bad output format"]:
             self.assertIn(msg, err_s)
 
     def test_exceptions_without_errors_have_basic_info(self):
+        mock_responses = {
+            'arvados.humans.delete': (
+                fake_httplib2_response(500, **self.ERROR_HEADERS),
+                "")
+            }
+        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+        api = arvados.api('v1', requestBuilder=req_builder)
         with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
-            self.api.humans().delete(uuid='xyz-xyz-abcdef').execute()
+            api.humans().delete(uuid='xyz-xyz-abcdef').execute()
         self.assertIn("500", str(err_ctx.exception))
 
     def test_request_too_large(self):
@@ -117,14 +102,25 @@ class ArvadosApiClientTest(unittest.TestCase):
             }
         req_builder = apiclient_http.RequestMockBuilder(mock_responses)
         api = arvados.api('v1',
-                          host=os.environ['ARVADOS_API_HOST'],
-                          token='discovery-doc-only-no-token-needed',
-                          insecure=True,
-                          requestBuilder=req_builder,
-                          model=OrderedJsonModel())
+                          requestBuilder=req_builder, model=OrderedJsonModel())
         result = api.humans().get(uuid='test').execute()
         self.assertEqual(string.hexdigits, ''.join(result.keys()))
 
+    def test_socket_errors_retried(self):
+        api = arvados.api('v1')
+        self.assertTrue(hasattr(api._http, 'orig_http_request'),
+                        "test doesn't know how to intercept HTTP requests")
+        api._http.orig_http_request = mock.MagicMock()
+        mock_response = {'user': 'person'}
+        api._http.orig_http_request.side_effect = [
+            socket.error("mock error"),
+            (fake_httplib2_response(200), json.dumps(mock_response))
+            ]
+        actual_response = api.users().current().execute()
+        self.assertEqual(mock_response, actual_response)
+        self.assertGreater(api._http.orig_http_request.call_count, 1,
+                           "client got the right response without retrying")
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/sdk/python/tests/test_retry_job_helpers.py b/sdk/python/tests/test_retry_job_helpers.py
new file mode 100644 (file)
index 0000000..6e562a0
--- /dev/null
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+import mock
+import os
+import unittest
+import hashlib
+import run_test_server
+import json
+import arvados
+import arvados_testutil as tutil
+from apiclient import http as apiclient_http
+
+
+@tutil.skip_sleep
+class ApiClientRetryTestMixin(object):
+
+    TEST_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+    TEST_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+
+    @classmethod
+    def setUpClass(cls):
+        run_test_server.run()
+
+    def setUp(self):
+        # Patch arvados.api() to return our mock API, so we can mock
+        # its http requests.
+        self.api_client = arvados.api('v1', cache=False)
+        self.api_patch = mock.patch('arvados.api', return_value=self.api_client)
+        self.api_patch.start()
+
+    def tearDown(self):
+        self.api_patch.stop()
+
+    def run_method(self):
+        raise NotImplementedError("test subclasses must define run_method")
+
+    def test_immediate_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [200]):
+            self.run_method()
+
+    def test_immediate_failure(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [400]), self.assertRaises(self.DEFAULT_EXCEPTION):
+            self.run_method()
+
+    def test_retry_then_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [500, 200]):
+            self.run_method()
+
+    def test_error_after_default_retries_exhausted(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [500, 500, 500, 500, 500, 500, 200]), self.assertRaises(self.DEFAULT_EXCEPTION):
+            self.run_method()
+
+    def test_no_retry_after_immediate_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [200, 400]):
+            self.run_method()
+
+
+class CurrentJobTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def setUp(self):
+        super(CurrentJobTestCase, self).setUp()
+        os.environ['JOB_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+        os.environ['JOB_WORK'] = '.'
+
+    def tearDown(self):
+        del os.environ['JOB_UUID']
+        del os.environ['JOB_WORK']
+        arvados._current_job = None
+        super(CurrentJobTestCase, self).tearDown()
+
+    def run_method(self):
+        arvados.current_job()
+
+
+class CurrentTaskTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def setUp(self):
+        super(CurrentTaskTestCase, self).setUp()
+        os.environ['TASK_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+        os.environ['TASK_WORK'] = '.'
+
+    def tearDown(self):
+        del os.environ['TASK_UUID']
+        del os.environ['TASK_WORK']
+        arvados._current_task = None
+        super(CurrentTaskTestCase, self).tearDown()
+
+    def run_method(self):
+        arvados.current_task()
+
+
+class TaskSetOutputTestCase(CurrentTaskTestCase, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def tearDown(self):
+        super(TaskSetOutputTestCase, self).tearDown()
+        run_test_server.reset()
+
+    def run_method(self, locator=ApiClientRetryTestMixin.TEST_LOCATOR):
+        arvados.task_set_output({'uuid':self.TEST_UUID},s=locator)
index b3e49c59a0954ee886b4969880271a7ec5c2291c..35c2f4884f4f99e2894c5125776edfb0db32895c 100644 (file)
@@ -25,7 +25,7 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        }
        h := newGitHandler()
        h.(*gitHandler).Path = "/bin/sh"
-       h.(*gitHandler).Args = []string{"-c", "echo HTTP/1.1 200 OK; echo Content-Type: text/plain; echo; env"}
+       h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
        os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
        os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
 
@@ -40,14 +40,14 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Addr)+`$.*`)
 }
 
-func (s *GitHandlerSuite) TestCGIError(c *check.C) {
+func (s *GitHandlerSuite) TestCGIErrorOnSplitHostPortError(c *check.C) {
        u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
        c.Check(err, check.Equals, nil)
        resp := httptest.NewRecorder()
        req := &http.Request{
                Method:     "GET",
                URL:        u,
-               RemoteAddr: "bogus",
+               RemoteAddr: "test.bad.address.missing.port",
        }
        h := newGitHandler()
        h.ServeHTTP(resp, req)