5824: Merge branch 'master' into 5824-keep-web-workbench
authorTom Clegg <tom@curoverse.com>
Wed, 11 Nov 2015 17:11:46 +0000 (12:11 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 11 Nov 2015 17:11:46 +0000 (12:11 -0500)
Conflicts:
services/keepproxy/keepproxy_test.go

sdk/cli/bin/arv-run-pipeline-instance
sdk/cli/bin/crunch-job
sdk/go/arvadosclient/arvadosclient.go
sdk/go/arvadosclient/arvadosclient_test.go
services/fuse/arvados_fuse/fusedir.py
services/fuse/bin/arv-mount
services/fuse/tests/test_mount.py

index d9e00dc22bf7d69ab91aaf2b9efce1616e1842ec..3e72658d476020bd6bf2cf8e58b5b1afb7a6b65a 100755 (executable)
@@ -149,6 +149,10 @@ p = Trollop::Parser.new do
       "Description for the pipeline instance.",
       :short => :none,
       :type => :string)
+  opt(:project_uuid,
+      "UUID of the project for the pipeline instance.",
+      short: :none,
+      type: :string)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -440,18 +444,23 @@ class WhRunPipelineInstance
         end
       end
     else
-      description = $options[:description]
-      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
-      @instance = PipelineInstance.
-        create(components: @components,
-               properties: {
-                 run_options: {
-                   enable_job_reuse: !@options[:no_reuse]
-                 }
-               },
-               pipeline_template_uuid: @template[:uuid],
-               description: description,
-               state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
+      description = $options[:description] ||
+                    ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
+      instance_body = {
+        components: @components,
+        properties: {
+          run_options: {
+            enable_job_reuse: !@options[:no_reuse]
+          }
+        },
+        pipeline_template_uuid: @template[:uuid],
+        description: description,
+        state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
+      }
+      if @options[:project_uuid]
+        instance_body[:owner_uuid] = @options[:project_uuid]
+      end
+      @instance = PipelineInstance.create(instance_body)
     end
     self
   end
index c2ea186ef5114f17a3198c1aee872103d5c0dc95..e2a4e264c3697e916ca8c88f98cf3a88d80e0e9a 100755 (executable)
@@ -893,7 +893,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+    $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
index 18e1074bf6f6c801c21593bc57586a902807f8b4..b67eaa59a6749fb7e9b1a3da5ad2617344fc799d 100644 (file)
@@ -32,6 +32,8 @@ var ErrInvalidArgument = errors.New("Invalid argument")
 // such failures by always using a new or recently active socket.
 var MaxIdleConnectionDuration = 30 * time.Second
 
+var RetryDelay = 2 * time.Second
+
 // Indicates an error that was returned by the API server.
 type APIServerError struct {
        // Address of server returning error, of the form "host:port".
@@ -65,6 +67,9 @@ type Dict map[string]interface{}
 
 // Information about how to contact the Arvados server
 type ArvadosClient struct {
+       // https
+       Scheme string
+
        // Arvados API server, form "host:port"
        ApiServer string
 
@@ -85,6 +90,9 @@ type ArvadosClient struct {
        DiscoveryDoc Dict
 
        lastClosedIdlesAt time.Time
+
+       // Number of retries
+       Retries int
 }
 
 // Create a new ArvadosClient, initialized with standard Arvados environment
@@ -96,12 +104,14 @@ func MakeArvadosClient() (ac ArvadosClient, err error) {
        external := matchTrue.MatchString(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
 
        ac = ArvadosClient{
+               Scheme:      "https",
                ApiServer:   os.Getenv("ARVADOS_API_HOST"),
                ApiToken:    os.Getenv("ARVADOS_API_TOKEN"),
                ApiInsecure: insecure,
                Client: &http.Client{Transport: &http.Transport{
                        TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
-               External: external}
+               External: external,
+               Retries:  2}
 
        if ac.ApiServer == "" {
                return ac, MissingArvadosApiHost
@@ -118,10 +128,12 @@ func MakeArvadosClient() (ac ArvadosClient, err error) {
 // CallRaw is the same as Call() but returns a Reader that reads the
 // response body, instead of taking an output object.
 func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string, action string, parameters Dict) (reader io.ReadCloser, err error) {
-       var req *http.Request
-
+       scheme := c.Scheme
+       if scheme == "" {
+               scheme = "https"
+       }
        u := url.URL{
-               Scheme: "https",
+               Scheme: scheme,
                Host:   c.ApiServer}
 
        if resourceType != API_DISCOVERY_RESOURCE {
@@ -151,27 +163,15 @@ func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string,
                }
        }
 
-       if method == "GET" || method == "HEAD" {
-               u.RawQuery = vals.Encode()
-               if req, err = http.NewRequest(method, u.String(), nil); err != nil {
-                       return nil, err
-               }
-       } else {
-               if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
-                       return nil, err
-               }
-               req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       retryable := false
+       switch method {
+       case "GET", "HEAD", "PUT", "OPTIONS", "DELETE":
+               retryable = true
        }
 
-       // Add api token header
-       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
-       if c.External {
-               req.Header.Add("X-External-Client", "1")
-       }
-
-       // POST and DELETE are not safe to retry automatically, so we minimize
-       // such failures by always using a new or recently active socket
-       if method == "POST" || method == "DELETE" {
+       // Non-retryable methods such as POST are not safe to retry automatically,
+       // so we minimize such failures by always using a new or recently active socket
+       if !retryable {
                if time.Since(c.lastClosedIdlesAt) > MaxIdleConnectionDuration {
                        c.lastClosedIdlesAt = time.Now()
                        c.Client.Transport.(*http.Transport).CloseIdleConnections()
@@ -179,17 +179,57 @@ func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string,
        }
 
        // Make the request
+       var req *http.Request
        var resp *http.Response
-       if resp, err = c.Client.Do(req); err != nil {
-               return nil, err
-       }
 
-       if resp.StatusCode == http.StatusOK {
-               return resp.Body, nil
+       for attempt := 0; attempt <= c.Retries; attempt++ {
+               if method == "GET" || method == "HEAD" {
+                       u.RawQuery = vals.Encode()
+                       if req, err = http.NewRequest(method, u.String(), nil); err != nil {
+                               return nil, err
+                       }
+               } else {
+                       if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
+                               return nil, err
+                       }
+                       req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+               }
+
+               // Add api token header
+               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
+               if c.External {
+                       req.Header.Add("X-External-Client", "1")
+               }
+
+               resp, err = c.Client.Do(req)
+               if err != nil {
+                       if retryable {
+                               time.Sleep(RetryDelay)
+                               continue
+                       } else {
+                               return nil, err
+                       }
+               }
+
+               if resp.StatusCode == http.StatusOK {
+                       return resp.Body, nil
+               }
+
+               defer resp.Body.Close()
+
+               switch resp.StatusCode {
+               case 408, 409, 422, 423, 500, 502, 503, 504:
+                       time.Sleep(RetryDelay)
+                       continue
+               default:
+                       return nil, newAPIServerError(c.ApiServer, resp)
+               }
        }
 
-       defer resp.Body.Close()
-       return nil, newAPIServerError(c.ApiServer, resp)
+       if resp != nil {
+               return nil, newAPIServerError(c.ApiServer, resp)
+       }
+       return nil, err
 }
 
 func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
index d1487235dcf6f71033e7fb8232bf6a8955d84d35..8e32efe4f987adccbe77b573a15ab27dbdfcc707 100644 (file)
@@ -1,8 +1,10 @@
 package arvadosclient
 
 import (
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        . "gopkg.in/check.v1"
+       "net"
        "net/http"
        "os"
        "testing"
@@ -16,6 +18,7 @@ func Test(t *testing.T) {
 
 var _ = Suite(&ServerRequiredSuite{})
 var _ = Suite(&UnitSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
 
 // Tests that require the Keep server running
 type ServerRequiredSuite struct{}
@@ -23,6 +26,7 @@ type ServerRequiredSuite struct{}
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
        arvadostest.StartKeep(2, false)
+       RetryDelay = 0
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
@@ -223,3 +227,160 @@ func (s *UnitSuite) TestPDHMatch(c *C) {
        c.Assert(PDHMatch("+12345"), Equals, false)
        c.Assert(PDHMatch(""), Equals, false)
 }
+
+// Tests that use mock arvados server
+type MockArvadosServerSuite struct{}
+
+func (s *MockArvadosServerSuite) SetUpSuite(c *C) {
+       RetryDelay = 0
+}
+
+func (s *MockArvadosServerSuite) SetUpTest(c *C) {
+       arvadostest.ResetEnv()
+}
+
+type APIServer struct {
+       listener net.Listener
+       url      string
+}
+
+func RunFakeArvadosServer(st http.Handler) (api APIServer, err error) {
+       api.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
+       if err != nil {
+               return
+       }
+       api.url = api.listener.Addr().String()
+       go http.Serve(api.listener, st)
+       return
+}
+
+type APIStub struct {
+       method        string
+       retryAttempts int
+       expected      int
+       respStatus    []int
+       responseBody  []string
+}
+
+func (h *APIStub) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/redirect-loop" {
+               http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+               return
+       }
+       if h.respStatus[h.retryAttempts] < 0 {
+               // Fail the client's Do() by starting a redirect loop
+               http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+       } else {
+               resp.WriteHeader(h.respStatus[h.retryAttempts])
+               resp.Write([]byte(h.responseBody[h.retryAttempts]))
+       }
+       h.retryAttempts++
+}
+
+func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
+       for _, stub := range []APIStub{
+               {
+                       "get", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+               },
+               {
+                       "create", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+               },
+               {
+                       "get", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "update", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "delete", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "delete", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "update", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 404, []int{404, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 401, []int{500, 401, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+
+               // Response code -1 simulates an HTTP/network error
+               // (i.e., Do() returns an error; there is no HTTP
+               // response status code).
+
+               // Succeed on second retry
+               {
+                       "get", 0, 200, []int{-1, -1, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               // "POST" is not safe to retry: fail after one error
+               {
+                       "create", 0, -1, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
+               },
+       } {
+               api, err := RunFakeArvadosServer(&stub)
+               c.Check(err, IsNil)
+
+               defer api.listener.Close()
+
+               arv := ArvadosClient{
+                       Scheme:      "http",
+                       ApiServer:   api.url,
+                       ApiToken:    "abc123",
+                       ApiInsecure: true,
+                       Client:      &http.Client{Transport: &http.Transport{}},
+                       Retries:     2}
+
+               getback := make(Dict)
+               switch stub.method {
+               case "get":
+                       err = arv.Get("collections", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+               case "create":
+                       err = arv.Create("collections",
+                               Dict{"collection": Dict{"name": "testing"}},
+                               &getback)
+               case "update":
+                       err = arv.Update("collections", "zzzzz-4zz18-znfnqtbbv4spc3w",
+                               Dict{"collection": Dict{"name": "testing"}},
+                               &getback)
+               case "delete":
+                       err = arv.Delete("pipeline_templates", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+               }
+
+               switch stub.expected {
+               case 200:
+                       c.Check(err, IsNil)
+                       c.Check(getback["ok"], Equals, "ok")
+               case -1:
+                       c.Check(err, NotNil)
+                       c.Check(err, ErrorMatches, `.*stopped after \d+ redirects`)
+               default:
+                       c.Check(err, NotNil)
+                       c.Check(err, ErrorMatches, fmt.Sprintf("arvados API server error: %d.*", stub.expected))
+                       c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+               }
+       }
+}
index 8ffca49601d795fc0622326e04eb219dd3dd0d8d..fdc93fb3beb37b7e4d4eda8400eb6f72ff82423a 100644 (file)
@@ -496,10 +496,11 @@ point the collection will actually be looked up on the server and the directory
 will appear if it exists.
 """.lstrip()
 
-    def __init__(self, parent_inode, inodes, api, num_retries):
+    def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
         super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
+        self.pdh_only = pdh_only
 
     def __setattr__(self, name, value):
         super(MagicDirectory, self).__setattr__(name, value)
@@ -511,13 +512,13 @@ will appear if it exists.
             # If we're the root directory, add an identical by_id subdirectory.
             if self.inode == llfuse.ROOT_INODE:
                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
-                        self.inode, self.inodes, self.api, self.num_retries))
+                        self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
 
     def __contains__(self, k):
         if k in self._entries:
             return True
 
-        if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
+        if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
             return False
 
         try:
index 7f9c916c456f49e76744f78ba06b18af2eb1ed1b..d15553456a2238fcee7d32d926517ada43d05785 100755 (executable)
@@ -39,6 +39,8 @@ with "--".
                             help="""Mount subdirectories listed by tag.""")
     mount_mode.add_argument('--by-id', action='store_true',
                             help="""Mount subdirectories listed by portable data hash or uuid.""")
+    mount_mode.add_argument('--by-pdh', action='store_true',
+                            help="""Mount subdirectories listed by portable data hash.""")
     mount_mode.add_argument('--project', type=str, help="""Mount a specific project.""")
     mount_mode.add_argument('--collection', type=str, help="""Mount only the specified collection.""")
 
@@ -103,9 +105,10 @@ with "--".
         now = time.time()
         dir_class = None
         dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
-        if args.by_id:
+        if args.by_id or args.by_pdh:
             # Set up the request handler with the 'magic directory' at the root
             dir_class = MagicDirectory
+            dir_args.append(args.by_pdh)
         elif args.by_tag:
             dir_class = TagsDirectory
         elif args.shared:
@@ -131,6 +134,7 @@ with "--".
             dir_args[0] = e.inode
 
             e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+
             e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
 
             dir_args.append(usr)
@@ -170,7 +174,8 @@ From here, the following directories are available:
         llfuse.init(operations, args.mountpoint, opts)
 
         # Subscribe to change events from API server
-        operations.listen_for_events(api)
+        if not args.by_pdh:
+            operations.listen_for_events(api)
 
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
index ff8883714512c8a8f6b6a0196a19cb128204d317..cc8693bd34c40628f193afd40c386751d86d1c78 100644 (file)
@@ -1060,3 +1060,66 @@ class FuseUnitTest(unittest.TestCase):
         self.assertEqual("_", fuse.sanitize_filename(""))
         self.assertEqual("_", fuse.sanitize_filename("."))
         self.assertEqual("__", fuse.sanitize_filename(".."))
+
+
+class FuseMagicTestPDHOnly(MountTestBase):
+    def setUp(self, api=None):
+        super(FuseMagicTestPDHOnly, self).setUp(api=api)
+
+        cw = arvados.CollectionWriter()
+
+        cw.start_new_file('thing1.txt')
+        cw.write("data 1")
+
+        self.testcollection = cw.finish()
+        self.test_manifest = cw.manifest_text()
+        created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+        self.testcollectionuuid = str(created['uuid'])
+
+    def verify_pdh_only(self, pdh_only=False, skip_pdh_only=False):
+        if skip_pdh_only is True:
+            self.make_mount(fuse.MagicDirectory)    # in this case, the default by_id applies
+        else:
+            self.make_mount(fuse.MagicDirectory, pdh_only=pdh_only)
+
+        mount_ls = llfuse.listdir(self.mounttmp)
+        self.assertIn('README', mount_ls)
+        self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
+                             arvados.util.uuid_pattern.match(fn)
+                             for fn in mount_ls),
+                         "new FUSE MagicDirectory lists Collection")
+
+        # look up using pdh should succeed in all cases
+        self.assertDirContents(self.testcollection, ['thing1.txt'])
+        self.assertDirContents(os.path.join('by_id', self.testcollection),
+                               ['thing1.txt'])
+        mount_ls = llfuse.listdir(self.mounttmp)
+        self.assertIn('README', mount_ls)
+        self.assertIn(self.testcollection, mount_ls)
+        self.assertIn(self.testcollection,
+                      llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+        files = {}
+        files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+
+        for k, v in files.items():
+            with open(os.path.join(self.mounttmp, k)) as f:
+                self.assertEqual(v, f.read())
+
+        # look up using uuid should fail when pdh_only is set
+        if pdh_only is True:
+            with self.assertRaises(OSError):
+                self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+                               ['thing1.txt'])
+        else:
+            self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+                               ['thing1.txt'])
+
+    def test_with_pdh_only_true(self):
+        self.verify_pdh_only(pdh_only=True)
+
+    def test_with_pdh_only_false(self):
+        self.verify_pdh_only(pdh_only=False)
+
+    def test_with_default_by_id(self):
+        self.verify_pdh_only(skip_pdh_only=True)