Merge branch '1.4-dev' of git.curoverse.com:arvados into 1.4-dev 1.4.2
authorWard Vandewege <wvandewege@veritasgenetics.com>
Thu, 21 Nov 2019 21:39:20 +0000 (16:39 -0500)
committerWard Vandewege <wvandewege@veritasgenetics.com>
Thu, 21 Nov 2019 21:39:20 +0000 (16:39 -0500)
29 files changed:
apps/workbench/Gemfile
apps/workbench/Gemfile.lock
apps/workbench/app/models/arvados_resource_list.rb
apps/workbench/app/views/users/_show_admin.html.erb
apps/workbench/test/unit/arvados_resource_list_test.rb
doc/api/methods/containers.html.textile.liquid
sdk/cli/arvados-cli.gemspec
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/done.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/arvados/retry.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_keep_client.py
sdk/ruby/lib/arvados/collection.rb
sdk/ruby/test/test_collection.rb
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/crunch-run/copier.go

index ce2a1377d72a3f9f6bbbc3cf92243921eff137cf..1ef84619a6abbf29e50db30f012b7ca581c11446 100644 (file)
@@ -5,7 +5,7 @@
 source 'https://rubygems.org'
 
 gem 'rails', '~> 5.0.0'
-gem 'arvados', '>= 0.1.20150511150219'
+gem 'arvados', git: 'https://github.com/curoverse/arvados.git', glob: 'sdk/ruby/arvados.gemspec'
 
 gem 'activerecord-nulldb-adapter', git: 'https://github.com/curoverse/nulldb'
 gem 'multi_json'
@@ -114,3 +114,7 @@ gem 'logstash-event'
 gem 'safe_yaml'
 
 gem 'npm-rails'
+
+# arvados-google-api-client and googleauth (and thus arvados) gems
+# depend on signet, but signet 0.12 is incompatible with ruby 2.3.
+gem 'signet', '< 0.12'
index 548da1dc049bf32a2abc886e2c451f2b5b1927ab..79d4f1712d62fa30e4ddf37873059de547ea3514 100644 (file)
@@ -1,3 +1,17 @@
+GIT
+  remote: https://github.com/curoverse/arvados.git
+  revision: dd9f2403f43bcb93da5908ddde57d8c0491bb4c2
+  glob: sdk/ruby/arvados.gemspec
+  specs:
+    arvados (1.4.1.20191019025325)
+      activesupport (>= 3)
+      andand (~> 1.3, >= 1.3.3)
+      arvados-google-api-client (>= 0.7, < 0.8.9)
+      faraday (< 0.16)
+      i18n (~> 0)
+      json (>= 1.7.7, < 3)
+      jwt (>= 0.1.5, < 2)
+
 GIT
   remote: https://github.com/curoverse/nulldb
   revision: d8e0073b665acdd2537c5eb15178a60f02f4b413
@@ -58,14 +72,7 @@ GEM
     andand (1.3.3)
     angularjs-rails (1.3.15)
     arel (7.1.4)
-    arvados (1.3.1.20190320201707)
-      activesupport (>= 3)
-      andand (~> 1.3, >= 1.3.3)
-      arvados-google-api-client (>= 0.7, < 0.8.9)
-      i18n (~> 0)
-      json (>= 1.7.7, < 3)
-      jwt (>= 0.1.5, < 2)
-    arvados-google-api-client (0.8.7.2)
+    arvados-google-api-client (0.8.7.3)
       activesupport (>= 3.2, < 5.1)
       addressable (~> 2.3)
       autoparse (~> 0.3)
@@ -127,7 +134,7 @@ GEM
     flamegraph (0.9.5)
     globalid (0.4.2)
       activesupport (>= 4.2.0)
-    googleauth (0.8.1)
+    googleauth (0.9.0)
       faraday (~> 0.12)
       jwt (>= 1.4, < 3.0)
       memoist (~> 0.16)
@@ -178,8 +185,8 @@ GEM
       metaclass (~> 0.0.1)
     morrisjs-rails (0.5.1.2)
       railties (> 3.1, < 6)
-    multi_json (1.13.1)
-    multipart-post (2.0.0)
+    multi_json (1.14.1)
+    multipart-post (2.1.1)
     net-scp (2.0.0)
       net-ssh (>= 2.6.5, < 6.0.0)
     net-sftp (2.1.2)
@@ -320,7 +327,7 @@ DEPENDENCIES
   activerecord-nulldb-adapter!
   andand
   angularjs-rails (~> 1.3.8)
-  arvados (>= 0.1.20150511150219)
+  arvados!
   bootstrap-sass (~> 3.4.1)
   bootstrap-tab-history-rails
   bootstrap-x-editable-rails
@@ -359,6 +366,7 @@ DEPENDENCIES
   sass
   sassc-rails
   selenium-webdriver (~> 3)
+  signet (< 0.12)
   simplecov (~> 0.7)
   simplecov-rcov
   sshkey
index 9ba61eaba08ef1d9e008233cf4f06e6c9f8ac97f..1a50a3fb3269b9d961b16f12eea88c0db581b6a7 100644 (file)
@@ -136,11 +136,16 @@ class ArvadosResourceList
     if not @results.nil?
       @results.each &block
     else
+      results = []
       self.each_page do |items|
         items.each do |i|
+          results << i
           block.call i
         end
       end
+      # Cache results only if all were retrieved (block didn't raise
+      # an exception).
+      @results = results
     end
     self
   end
index 89156aaf820238c3394d5e7d2c7975d03f9009f8..ddff79be01c3b02c4a6ff6c7c95f28f129fb1711 100644 (file)
@@ -76,7 +76,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
                     disabled: (group.owner_uuid == @object.uuid),
                     data: {
                       permission_head: group.uuid,
-                      permission_uuid: permitted_group_perms[group.uuid]}) %>
+                      permission_uuid: permitted_group_perms[group.uuid] || 'x'}) %>
                 <small>user&rarr;group</small>
               </label>
               <label class="checkbox-inline" data-toggle-permission="true" data-permission-head="<%= @object.uuid %>" data-permission-name="can_read">
@@ -87,7 +87,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
                     disabled: (group.owner_uuid == @object.uuid),
                     data: {
                       permission_tail: group.uuid,
-                      permission_uuid: member_group_perms[group.uuid]}) %>
+                      permission_uuid: member_group_perms[group.uuid] || 'x'}) %>
                 <small>group&rarr;user</small>
               </label>
               <label class="checkbox-inline">
index e9eb2f8ef6a8a1ab6792665c10570e0b317129cd..270b96203b249276be213c7a14b7346abf4487ca 100644 (file)
@@ -103,4 +103,20 @@ class ResourceListTest < ActiveSupport::TestCase
     assert_nil c.items_available
     refute_empty c.results
   end
+
+  test 'cache results across each(&block) calls' do
+    use_token :admin
+    c = Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').with_count('none')
+    c.each do |x|
+      x.description = 'foo'
+    end
+    found = 0
+    c.each do |x|
+      found += 1
+      # We should get the same objects we modified in the loop above
+      # -- not new objects built from another set of API responses.
+      assert_equal 'foo', x.description
+    end
+    assert_equal 201, found
+  end
 end
index d59c66edc3cbf5f492dbb0c4befac668e209c980..5ec95cee62ab18a7f8b69ac42382a4f42e7ac1cd 100644 (file)
@@ -110,7 +110,9 @@ table(table table-bordered table-condensed).
 
 h3. delete
 
-Delete an existing Container.
+Delete a Container.
+
+This API requires admin privileges. In normal operation, it should not be used at all. API clients like Workbench might not work correctly when a container request references a container that has been deleted.
 
 Arguments:
 
index f21c3e4217de4f8eb41cca0dac70aaea4c13d0c5..efb3cae9d1ab47614153593e27686e6eca9bebfe 100644 (file)
@@ -30,7 +30,7 @@ Gem::Specification.new do |s|
   s.executables << "arv-crunch-job"
   s.executables << "arv-tag"
   s.required_ruby_version = '>= 2.1.0'
-  s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
+  s.add_runtime_dependency 'arvados', '>= 1.4.1.20190320201707'
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
   s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
index 95711762c9a421a94c3581b165d9dbd6522a99e6..56f91597bae1538d56cadc3c835ae42a3323f216 100644 (file)
@@ -36,7 +36,7 @@ from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
 
-# These arn't used directly in this file but
+# These aren't used directly in this file but
 # other code expects to import them from here
 from .arvcontainer import ArvadosContainer
 from .arvjob import ArvadosJob
index f8f1f30f633bd2f7fa9600ae77736ad2634ac3b4..ad11aa490fe4e9b9996560c8cf430619df608359 100644 (file)
@@ -418,7 +418,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": self.enable_reuse,
+            "use_existing": False, # Never reuse the runner container - see #15497.
             "properties": {}
         }
 
index 9b26ad7064207e8e76e4f819604833ab7b7a1dd7..806819afe29299277b9d22ccf9d121c7fd4041ce 100644 (file)
@@ -70,7 +70,7 @@ def logtail(logcollection, logfunc, header, maxlen=25):
             logname = log[:-4]
             logt = deque([], maxlen)
             mergelogs[logname] = logt
-            with logcollection.open(log) as f:
+            with logcollection.open(log, encoding="utf-8") as f:
                 for l in f:
                     if containersapi:
                         g = timestamp_re.match(l)
index f1118695023d017d5507103d4143245e9a0d2a63..3cca9b2cc3b2eae0eae417e1574f54ee8f5691a9 100644 (file)
@@ -312,7 +312,7 @@ def stubs(func):
                 'vcpus': 1,
                 'ram': (1024+256)*1024*1024
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {},
             'secret_mounts': {}
         }
@@ -855,7 +855,7 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1342177280
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {},
             'secret_mounts': {}
         }
@@ -972,7 +972,7 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1342177280
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {
                 "template_uuid": "962eh-7fd4e-gkbzl62qqtfig37"
             },
@@ -1383,7 +1383,7 @@ class TestSubmit(unittest.TestCase):
                 }
             },
             "state": "Committed",
-            "use_existing": True
+            "use_existing": False
         }
 
         stubs.api.container_requests().create.assert_called_with(
index 3058a7609c6665dfd22c2ee6bb205685c97afe7d..d06aba3695adc37f3d74057de2568778bcd9f9c9 100644 (file)
@@ -58,6 +58,9 @@ type FileSystem interface {
        // while locking multiple inodes.
        locker() sync.Locker
 
+       // throttle for limiting concurrent background writers
+       throttle() *throttle
+
        // create a new node with nil parent.
        newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
 
@@ -86,7 +89,19 @@ type FileSystem interface {
        Remove(name string) error
        RemoveAll(name string) error
        Rename(oldname, newname string) error
+
+       // Write buffered data from memory to storage, returning when
+       // all updates have been saved to persistent storage.
        Sync() error
+
+       // Write buffered data from memory to storage, but don't wait
+       // for all writes to finish before returning. If shortBlocks
+       // is true, flush everything; otherwise, if there's less than
+       // a full block of buffered data at the end of a stream, leave
+       // it buffered in memory in case more data can be appended. If
+       // path is "", flush all dirs/streams; otherwise, flush only
+       // the specified dir/stream.
+       Flush(path string, shortBlocks bool) error
 }
 
 type inode interface {
@@ -288,12 +303,17 @@ type fileSystem struct {
        root inode
        fsBackend
        mutex sync.Mutex
+       thr   *throttle
 }
 
 func (fs *fileSystem) rootnode() inode {
        return fs.root
 }
 
+func (fs *fileSystem) throttle() *throttle {
+       return fs.thr
+}
+
 func (fs *fileSystem) locker() sync.Locker {
        return &fs.mutex
 }
@@ -560,6 +580,11 @@ func (fs *fileSystem) Sync() error {
        return ErrInvalidOperation
 }
 
+func (fs *fileSystem) Flush(string, bool) error {
+       log.Printf("TODO: flush fileSystem")
+       return ErrInvalidOperation
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
index 6644f4cfb8e93ef7d601e667cee21a9dbce5d39b..578d73f5a0a32c19987d15cccdbf38f0d7f363e8 100644 (file)
@@ -21,8 +21,7 @@ import (
 
 var (
        maxBlockSize      = 1 << 26
-       concurrentWriters = 4 // max goroutines writing to Keep during sync()
-       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+       concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
 )
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
@@ -38,6 +37,9 @@ type CollectionFileSystem interface {
 
        // Total data bytes in all files.
        Size() int64
+
+       // Memory consumed by buffered file data.
+       memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -57,6 +59,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                uuid: c.UUID,
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: client, keepClient: kc},
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root := &dirnode{
@@ -138,10 +141,48 @@ func (fs *collectionFileSystem) Sync() error {
        return nil
 }
 
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+       node, err := rlookup(fs.fileSystem.root, path)
+       if err != nil {
+               return err
+       }
+       dn, ok := node.(*dirnode)
+       if !ok {
+               return ErrNotADirectory
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       names := dn.sortedNames()
+       if path != "" {
+               // Caller only wants to flush the specified dir,
+               // non-recursively.  Drop subdirs from the list of
+               // names.
+               var filenames []string
+               for _, name := range names {
+                       if _, ok := dn.inodes[name].(*filenode); ok {
+                               filenames = append(filenames, name)
+                       }
+               }
+               names = filenames
+       }
+       for _, name := range names {
+               child := dn.inodes[name]
+               child.Lock()
+               defer child.Unlock()
+       }
+       return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -233,7 +274,6 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
-       throttle *throttle
 }
 
 // caller must have lock
@@ -496,12 +536,8 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: share code with (*dirnode)sync()
+       // TODO: share code with (*dirnode)flush()
        // TODO: pack/flush small blocks too, when fragmented
-       if fn.throttle == nil {
-               // TODO: share a throttle with filesystem
-               fn.throttle = newThrottle(writeAheadBlocks)
-       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
                if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
@@ -517,14 +553,14 @@ func (fn *filenode) pruneMemSegments() {
                // progress, block here until one finishes, rather
                // than pile up an unlimited number of buffered writes
                // and network flush operations.
-               fn.throttle.Acquire()
+               fn.fs.throttle().Acquire()
                go func() {
                        defer close(done)
                        locator, _, err := fn.FS().PutB(buf)
-                       fn.throttle.Release()
+                       fn.fs.throttle().Release()
                        fn.Lock()
                        defer fn.Unlock()
-                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                       if seg.flushing != done {
                                // A new seg.buf has been allocated.
                                return
                        }
@@ -551,8 +587,8 @@ func (fn *filenode) pruneMemSegments() {
        }
 }
 
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
 func (fn *filenode) waitPrune() {
        var pending []<-chan struct{}
        fn.Lock()
@@ -608,51 +644,141 @@ type fnSegmentRef struct {
 // storedSegments that reference the relevant portions of the new
 // block.
 //
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
 // Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
        if len(refs) == 0 {
                return nil
        }
-       throttle.Acquire()
-       defer throttle.Release()
        if err := ctx.Err(); err != nil {
                return err
        }
-       block := make([]byte, 0, maxBlockSize)
+       done := make(chan struct{})
+       var block []byte
+       segs := make([]*memSegment, 0, len(refs))
+       offsets := make([]int, 0, len(refs)) // location of segment's data within block
        for _, ref := range refs {
-               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
-       }
-       locator, _, err := dn.fs.PutB(block)
-       if err != nil {
-               return err
+               seg := ref.fn.segments[ref.idx].(*memSegment)
+               if seg.flushing != nil && !sync {
+                       // Let the other flushing goroutine finish. If
+                       // it fails, we'll try again next time.
+                       return nil
+               } else {
+                       // In sync mode, we proceed regardless of
+                       // whether another flush is in progress: It
+                       // can't finish before we do, because we hold
+                       // fn's lock until we finish our own writes.
+               }
+               seg.flushing = done
+               offsets = append(offsets, len(block))
+               if len(refs) == 1 {
+                       block = seg.buf
+               } else if block == nil {
+                       block = append(make([]byte, 0, bufsize), seg.buf...)
+               } else {
+                       block = append(block, seg.buf...)
+               }
+               segs = append(segs, seg)
        }
-       off := 0
-       for _, ref := range refs {
-               data := ref.fn.segments[ref.idx].(*memSegment).buf
-               ref.fn.segments[ref.idx] = storedSegment{
-                       kc:      dn.fs,
-                       locator: locator,
-                       size:    len(block),
-                       offset:  off,
-                       length:  len(data),
+       dn.fs.throttle().Acquire()
+       errs := make(chan error, 1)
+       go func() {
+               defer close(done)
+               defer close(errs)
+               locked := map[*filenode]bool{}
+               locator, _, err := dn.fs.PutB(block)
+               dn.fs.throttle().Release()
+               {
+                       if !sync {
+                               for _, name := range dn.sortedNames() {
+                                       if fn, ok := dn.inodes[name].(*filenode); ok {
+                                               fn.Lock()
+                                               defer fn.Unlock()
+                                               locked[fn] = true
+                                       }
+                               }
+                       }
+                       defer func() {
+                               for _, seg := range segs {
+                                       if seg.flushing == done {
+                                               seg.flushing = nil
+                                       }
+                               }
+                       }()
+               }
+               if err != nil {
+                       errs <- err
+                       return
                }
-               off += len(data)
-               ref.fn.memsize -= int64(len(data))
+               for idx, ref := range refs {
+                       if !sync {
+                               // In async mode, fn's lock was
+                               // released while we were waiting for
+                               // PutB(); lots of things might have
+                               // changed.
+                               if len(ref.fn.segments) <= ref.idx {
+                                       // file segments have
+                                       // rearranged or changed in
+                                       // some way
+                                       continue
+                               } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+                                       // segment has been replaced
+                                       continue
+                               } else if seg.flushing != done {
+                                       // seg.buf has been replaced
+                                       continue
+                               } else if !locked[ref.fn] {
+                                       // file was renamed, moved, or
+                                       // deleted since we called
+                                       // PutB
+                                       continue
+                               }
+                       }
+                       data := ref.fn.segments[ref.idx].(*memSegment).buf
+                       ref.fn.segments[ref.idx] = storedSegment{
+                               kc:      dn.fs,
+                               locator: locator,
+                               size:    len(block),
+                               offset:  offsets[idx],
+                               length:  len(data),
+                       }
+                       ref.fn.memsize -= int64(len(data))
+               }
+       }()
+       if sync {
+               return <-errs
+       } else {
+               return nil
        }
-       return nil
 }
 
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+       sync        bool
+       shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
 // children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
-       goCommit := func(refs []fnSegmentRef) {
+       goCommit := func(refs []fnSegmentRef, bufsize int) {
                cg.Go(func() error {
-                       return dn.commitBlock(cg.Context(), throttle, refs)
+                       return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
                })
        }
 
@@ -660,47 +786,87 @@ func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string)
        var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
-               fn, ok := dn.inodes[name].(*filenode)
-               if !ok {
-                       continue
-               }
-               for idx, seg := range fn.segments {
-                       switch seg := seg.(type) {
-                       case storedSegment:
-                               loc, ok := localLocator[seg.locator]
-                               if !ok {
-                                       var err error
-                                       loc, err = dn.fs.LocalLocator(seg.locator)
-                                       if err != nil {
-                                               return err
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       grandchildNames := node.sortedNames()
+                       for _, grandchildName := range grandchildNames {
+                               grandchild := node.inodes[grandchildName]
+                               grandchild.Lock()
+                               defer grandchild.Unlock()
+                       }
+                       cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+               case *filenode:
+                       for idx, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       loc, ok := localLocator[seg.locator]
+                                       if !ok {
+                                               var err error
+                                               loc, err = dn.fs.LocalLocator(seg.locator)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               localLocator[seg.locator] = loc
                                        }
-                                       localLocator[seg.locator] = loc
-                               }
-                               seg.locator = loc
-                               fn.segments[idx] = seg
-                       case *memSegment:
-                               if seg.Len() > maxBlockSize/2 {
-                                       goCommit([]fnSegmentRef{{fn, idx}})
-                                       continue
-                               }
-                               if pendingLen+seg.Len() > maxBlockSize {
-                                       goCommit(pending)
-                                       pending = nil
-                                       pendingLen = 0
+                                       seg.locator = loc
+                                       node.segments[idx] = seg
+                               case *memSegment:
+                                       if seg.Len() > maxBlockSize/2 {
+                                               goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+                                               continue
+                                       }
+                                       if pendingLen+seg.Len() > maxBlockSize {
+                                               goCommit(pending, pendingLen)
+                                               pending = nil
+                                               pendingLen = 0
+                                       }
+                                       pending = append(pending, fnSegmentRef{node, idx})
+                                       pendingLen += seg.Len()
+                               default:
+                                       panic(fmt.Sprintf("can't sync segment type %T", seg))
                                }
-                               pending = append(pending, fnSegmentRef{fn, idx})
-                               pendingLen += seg.Len()
-                       default:
-                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       goCommit(pending)
+       if opts.shortBlocks {
+               goCommit(pending, pendingLen)
+       }
        return cg.Wait()
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) memorySize() (size int64) {
+       for _, name := range dn.sortedNames() {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
+               case *dirnode:
+                       size += node.memorySize()
+               case *filenode:
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case *memSegment:
+                                       size += int64(seg.Len())
+                               }
+                       }
+               }
+       }
+       return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+       return names
+}
+
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
@@ -715,11 +881,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
-       }
-       sort.Strings(names)
+       names := dn.sortedNames()
 
        // Wait for children to finish any pending write operations
        // before locking them.
@@ -751,7 +913,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
        for i, name := range dirnames {
                i, name := i, name
                cg.Go(func() error {
-                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
                        subdirs[i] = txt
                        return err
                })
@@ -767,7 +929,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
                var fileparts []filepart
                var blocks []string
-               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+               if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
                        return err
                }
                for _, name := range filenames {
@@ -800,7 +962,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                                default:
                                        // This can't happen: we
                                        // haven't unlocked since
-                                       // calling sync().
+                                       // calling flush(sync=true).
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
index 2ae2bd8924e23b583a267091cc6b9985e52d3422..e5cea0639bb4213d7f118eeef0c5c3c628648c0f 100644 (file)
@@ -536,7 +536,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
        }
 
        maxBlockSize = 8
-       defer func() { maxBlockSize = 2 << 26 }()
+       defer func() { maxBlockSize = 1 << 26 }()
 
        var wg sync.WaitGroup
        for n := 0; n < 128; n++ {
@@ -1040,12 +1040,12 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(err, check.ErrorMatches, `invalid flag.*`)
 }
 
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
-       defer func(wab, mbs int) {
-               writeAheadBlocks = wab
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
+       defer func(cw, mbs int) {
+               concurrentWriters = cw
                maxBlockSize = mbs
-       }(writeAheadBlocks, maxBlockSize)
-       writeAheadBlocks = 2
+       }(concurrentWriters, maxBlockSize)
+       concurrentWriters = 2
        maxBlockSize = 1024
 
        proceed := make(chan struct{})
@@ -1070,7 +1070,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                default:
                        time.Sleep(time.Millisecond)
                }
-               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
        }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
@@ -1106,6 +1106,181 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
        c.Check(currentMemExtents(), check.HasLen, 0)
 }
 
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       s.kc.onPut = func([]byte) {
+               // discard flushed data -- otherwise the stub will use
+               // unlimited memory
+               time.Sleep(time.Millisecond)
+               s.kc.Lock()
+               defer s.kc.Unlock()
+               s.kc.blocks = map[string][]byte{}
+       }
+       for i := 0; i < 256; i++ {
+               buf := bytes.NewBuffer(make([]byte, 524288))
+               fmt.Fprintf(buf, "test file in dir%d", i)
+
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 2; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = io.Copy(f, buf)
+                       c.Assert(err, check.IsNil)
+               }
+
+               if i%8 == 0 {
+                       fs.Flush("", true)
+               }
+
+               size := fs.memorySize()
+               if !c.Check(size <= 1<<24, check.Equals, true) {
+                       c.Logf("at dir%d fs.memorySize()=%d", i, size)
+                       return
+               }
+       }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       var flushed int64
+       s.kc.onPut = func(p []byte) {
+               atomic.AddInt64(&flushed, int64(len(p)))
+       }
+
+       nDirs := int64(8)
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 67; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = f.Write(megabyte)
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+       c.Check(flushed, check.Equals, int64(0))
+
+       waitForFlush := func(expectUnflushed, expectFlushed int64) {
+               for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+               }
+               c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+               c.Check(flushed, check.Equals, expectFlushed)
+       }
+
+       // Nothing flushed yet
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flushing a non-empty dir "/" is non-recursive and there are
+       // no top-level files, so this has no effect
+       fs.Flush("/", false)
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flush the full block in dir0
+       fs.Flush("dir0", false)
+       waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+       err = fs.Flush("dir-does-not-exist", false)
+       c.Check(err, check.NotNil)
+
+       // Flush full blocks in all dirs
+       fs.Flush("", false)
+       waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+       // Flush non-full blocks, too
+       fs.Flush("", true)
+       waitForFlush(0, nDirs*67<<20)
+}
+
+// Even when writing lots of files/dirs from different goroutines, as
+// long as Flush(dir,false) is called after writing each file,
+// unflushed data should be limited to one full block per
+// concurrentWriter, plus one nearly-full block at the end of each
+// dir/stream.
+func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
+       nDirs := int64(8)
+       maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       release := make(chan struct{})
+       timeout := make(chan struct{})
+       time.AfterFunc(10*time.Second, func() { close(timeout) })
+       var putCount, concurrency int64
+       var unflushed int64
+       s.kc.onPut = func(p []byte) {
+               defer atomic.AddInt64(&unflushed, -int64(len(p)))
+               cur := atomic.AddInt64(&concurrency, 1)
+               defer atomic.AddInt64(&concurrency, -1)
+               pc := atomic.AddInt64(&putCount, 1)
+               if pc < int64(concurrentWriters) {
+                       // Block until we reach concurrentWriters, to
+                       // make sure we're really accepting concurrent
+                       // writes.
+                       select {
+                       case <-release:
+                       case <-timeout:
+                               c.Error("timeout")
+                       }
+               } else if pc == int64(concurrentWriters) {
+                       // Unblock the first N-1 PUT reqs.
+                       close(release)
+               }
+               c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
+               c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
+       }
+
+       var owg sync.WaitGroup
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               owg.Add(1)
+               go func() {
+                       defer owg.Done()
+                       defer fs.Flush(dir, true)
+                       var iwg sync.WaitGroup
+                       defer iwg.Wait()
+                       for j := 0; j < 67; j++ {
+                               iwg.Add(1)
+                               go func(j int) {
+                                       defer iwg.Done()
+                                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                                       c.Assert(err, check.IsNil)
+                                       defer f.Close()
+                                       n, err := f.Write(megabyte)
+                                       c.Assert(err, check.IsNil)
+                                       atomic.AddInt64(&unflushed, int64(n))
+                                       fs.Flush(dir, false)
+                               }(j)
+                       }
+               }()
+       }
+       owg.Wait()
+       fs.Flush("", true)
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",
index 82114e2ea9ed54ac89cc94dd387d73711efae561..4264be4fa600be7abb05351f506feb00dccac6cc 100644 (file)
@@ -21,6 +21,7 @@ type CustomFileSystem interface {
 type customFileSystem struct {
        fileSystem
        root *vdirnode
+       thr  *throttle
 
        staleThreshold time.Time
        staleLock      sync.Mutex
@@ -33,6 +34,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: c, keepClient: kc},
                        root:      root,
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root.inode = &treenode{
index 37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b..6893b94bf78b7b16a1da1a802fdabe419563eb0d 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -500,6 +500,7 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self.num_retries = num_retries
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -554,9 +555,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -725,9 +726,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
index cf1a36f9fdfbbfdf739fe75027d00eaa782df4f2..26902931582244142054d69bc2e52fe596927de3 100644 (file)
@@ -1410,7 +1410,7 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries)
         return self._block_manager
 
     def _remember_api_response(self, response):
index afe75b31056c5c7d872256875a74265012de3b91..5436702b96ecc05571244128f28ebb6c3f2a17d6 100644 (file)
@@ -823,7 +823,9 @@ class ArvPutUploadJob(object):
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
-                    update_collection, api_client=self._api_client)
+                    update_collection,
+                    api_client=self._api_client,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -866,7 +868,8 @@ class ArvPutUploadJob(object):
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
                 put_threads=self.put_threads,
-                api_client=self._api_client)
+                api_client=self._api_client,
+                num_retries=self.num_retries)
 
     def _cached_manifest_valid(self):
         """
index 4354ced67d3b4a3b678be3cacaa575f58f4f6d3f..86a28f54c402c8d44aba1d8511faab18e5e8b44a 100644 (file)
@@ -1117,7 +1117,7 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None):
@@ -1196,8 +1196,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
index 3f62ab779f81fa43537a8223b7348bed52dc3a7c..ea4095930fc78f7cbbb26c49f45a8fa66fbb4081 100644 (file)
@@ -64,6 +64,7 @@ class RetryLoop(object):
         self.max_wait = max_wait
         self.next_start_time = 0
         self.results = deque(maxlen=save_results)
+        self._attempts = 0
         self._running = None
         self._success = None
 
@@ -101,6 +102,7 @@ class RetryLoop(object):
                 "recorded a loop result after the loop finished")
         self.results.append(result)
         self._success = self.check_result(result)
+        self._attempts += 1
 
     def success(self):
         """Return the loop's end state.
@@ -118,6 +120,19 @@ class RetryLoop(object):
             raise arvados.errors.AssertionError(
                 "queried loop results before any were recorded")
 
+    def attempts(self):
+        """Return the number of attempts that have been made.
+
+        Includes successes and failures."""
+        return self._attempts
+
+    def attempts_str(self):
+        """Human-readable attempts(): 'N attempts' or '1 attempt'"""
+        if self._attempts == 1:
+            return '1 attempt'
+        else:
+            return '{} attempts'.format(self._attempts)
+
 
 def check_http_response_success(status_code):
     """Convert an HTTP status code to a loop control flag.
index a760255dd6da8e01470dacafa6bcfafeddc50058..086fa542a2b28b2112f92eb49e21934247cf2710 100644 (file)
@@ -865,7 +865,7 @@ class BlockManagerTest(unittest.TestCase):
     def test_bufferblock_commit_pending(self):
         # Test for bug #7225
         mockkeep = mock.MagicMock()
-        mockkeep.put.side_effect = lambda x: time.sleep(1)
+        mockkeep.put.side_effect = lambda *args, **kwargs: time.sleep(1)
         with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
             bufferblock = blockmanager.alloc_bufferblock()
             bufferblock.append("foo")
index d6b3a2a12dc6f4d4d8bb997c25f43cc21e8cda62..f6bcdc7f97c169f63e4565d3fda0f6c30ada7b54 100644 (file)
@@ -489,15 +489,16 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_errors_from_last_retry(self, verb, exc_class):
         api_client = self.mock_keep_services(count=2)
         req_mock = tutil.mock_keep_responses(
-            "retry error reporting test", 500, 500, 403, 403)
+            "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
         with req_mock, tutil.skip_sleep, \
                 self.assertRaises(exc_class) as err_check:
             keep_client = arvados.KeepClient(api_client=api_client)
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
                                        num_retries=3)
-        self.assertEqual([403, 403], [
+        self.assertEqual([502, 502], [
                 getattr(error, 'status_code', None)
                 for error in err_check.exception.request_errors().values()])
+        self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
 
     def test_get_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
@@ -1096,7 +1097,9 @@ class KeepClientRetryTestMixin(object):
     def check_exception(self, error_class=None, *args, **kwargs):
         if error_class is None:
             error_class = self.DEFAULT_EXCEPTION
-        self.assertRaises(error_class, self.run_method, *args, **kwargs)
+        with self.assertRaises(error_class) as err:
+            self.run_method(*args, **kwargs)
+        return err
 
     def test_immediate_success(self):
         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
@@ -1120,7 +1123,8 @@ class KeepClientRetryTestMixin(object):
 
     def test_error_after_retries_exhausted(self):
         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
-            self.check_exception(num_retries=1)
+            err = self.check_exception(num_retries=1)
+        self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
 
     def test_num_retries_instance_fallback(self):
         self.client_kwargs['num_retries'] = 3
index f236ce83a30a47e51c8e8499abb7b5aecfe29c56..e29deba6c9de314f1deb9c14fd6e723c8e085ac9 100644 (file)
@@ -207,7 +207,7 @@ module Arv
         loop do
           ii = (lo + hi) / 2
           range = @ranges[ii]
-          if range.include?(target)
+          if range.include?(target) && (target < range.end || ii == hi-1)
             return ii
           elsif ii == lo
             raise RangeError.new("%i not in segment" % target)
@@ -481,14 +481,13 @@ module Arv
 
       def initialize(name)
         @name = name
-        @loc_ranges = {}
+        @loc_ranges = []
         @loc_range_start = 0
         @file_specs = []
       end
 
       def add_file(coll_file)
         coll_file.each_segment do |segment|
-          extend_locator_ranges(segment.locators)
           extend_file_specs(coll_file.name, segment)
         end
       end
@@ -498,48 +497,51 @@ module Arv
           ""
         else
           "%s %s %s\n" % [escape_name(@name),
-                          @loc_ranges.keys.join(" "),
+                          @loc_ranges.collect(&:locator).join(" "),
                           @file_specs.join(" ")]
         end
       end
 
       private
 
-      def extend_locator_ranges(locators)
-        locators.
-            select { |loc_s| not @loc_ranges.include?(loc_s) }.
-            each do |loc_s|
-          @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
-          @loc_range_start = @loc_ranges[loc_s].end
+      def extend_file_specs(filename, segment)
+        found_overlap = false
+        # Find the longest prefix of segment.locators that's a suffix
+        # of the existing @loc_ranges. If we find one, drop those
+        # locators (they'll be added back below, when we're handling
+        # the normal/no-overlap case).
+        (1..segment.locators.length).each do |overlap|
+          if @loc_ranges.length >= overlap && @loc_ranges[-overlap..-1].collect(&:locator) == segment.locators[0..overlap-1]
+            (1..overlap).each do
+              discarded = @loc_ranges.pop
+              @loc_range_start -= (discarded.end - discarded.begin)
+            end
+            found_overlap = true
+            break
+          end
         end
-      end
 
-      def extend_file_specs(filename, segment)
-        # Given a filename and a LocatorSegment, add the smallest
-        # possible array of file spec strings to @file_specs that
-        # builds the file from available locators.
-        filename = escape_name(filename)
-        start_pos = segment.start_pos
-        length = segment.length
-        start_loc = segment.locators.first
-        prev_loc = start_loc
-        # Build a list of file specs by iterating through the segment's
-        # locators and preparing a file spec for each contiguous range.
-        segment.locators[1..-1].each do |loc_s|
-          range = @loc_ranges[loc_s]
-          if range.begin != @loc_ranges[prev_loc].end
-            range_start, range_length =
-              start_and_length_at(start_loc, prev_loc, start_pos, length)
-            @file_specs << "#{range_start}:#{range_length}:#{filename}"
-            start_pos = 0
-            length -= range_length
-            start_loc = loc_s
+        # If there was no overlap at the end of our existing
+        # @loc_ranges, check whether the full set of segment.locators
+        # appears earlier in @loc_ranges. If so, use those instead of
+        # appending the same locators again.
+        if !found_overlap && segment.locators.length < @loc_ranges.length
+          segment_start = 0
+          (0..@loc_ranges.length-1).each do |ri|
+            if @loc_ranges[ri..ri+segment.locators.length-1].collect(&:locator) == segment.locators
+              @file_specs << "#{segment.start_pos + @loc_ranges[ri].begin}:#{segment.length}:#{escape_name(filename)}"
+              return
+            end
           end
-          prev_loc = loc_s
         end
-        range_start, range_length =
-          start_and_length_at(start_loc, prev_loc, start_pos, length)
-        @file_specs << "#{range_start}:#{range_length}:#{filename}"
+
+        segment_start = @loc_range_start
+        segment.locators.each do |loc_s|
+          r = LocatorRange.new(loc_s, @loc_range_start)
+          @loc_ranges << r
+          @loc_range_start = r.end
+        end
+        @file_specs << "#{segment.start_pos + segment_start}:#{segment.length}:#{escape_name(filename)}"
       end
 
       def escape_name(name)
@@ -547,12 +549,6 @@ module Arv
           s.each_byte.map { |c| "\\%03o" % c }.join("")
         end
       end
-
-      def start_and_length_at(start_key, end_key, start_pos, length)
-        range_begin = @loc_ranges[start_key].begin + start_pos
-        range_length = [@loc_ranges[end_key].end - range_begin, length].min
-        [range_begin, range_length]
-      end
     end
   end
 end
index 288fd263fa8bdbe69cff943446dd2c0f3db9bc04..8b747c365211b742d303a2d2894129253d830d37 100644 (file)
@@ -15,6 +15,10 @@ class CollectionTest < Minitest::Test
      "./s1 #{TWO_BY_TWO_BLOCKS.last} 0:5:f1 5:4:f3\n"]
   TWO_BY_TWO_MANIFEST_S = TWO_BY_TWO_MANIFEST_A.join("")
 
+  def abcde_blocks
+    ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+9", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+9", "cccccccccccccccccccccccccccccccc+9", "dddddddddddddddddddddddddddddddd+9", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee+9"]
+  end
+
   ### .new
 
   def test_empty_construction
@@ -32,6 +36,29 @@ class CollectionTest < Minitest::Test
     end
   end
 
+  def test_range_edge_cases
+    [
+      ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1\n",
+      ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n",
+      ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file1\n",
+      ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2 0:0:file1\n",
+      ". 0cc175b9c0f1b6a831c399e269772661+1 0:0:file1 1:0:file2 1:0:file1\n",
+    ].each do |txt|
+      coll = Arv::Collection.new(txt)
+      coll.normalize
+      assert_match(/ 0:0:file1/, coll.manifest_text)
+    end
+    [
+      ". d41d8cd98f00b204e9800998ecf8427e+0 1:0:file1\n",
+      ". 0cc175b9c0f1b6a831c399e269772661+1 0:0:file1 2:0:file2 1:0:file1\n",
+    ].each do |txt|
+      assert_raises(RangeError) do
+        coll = Arv::Collection.new(txt)
+        coll.normalize
+      end
+    end
+  end
+
   def test_non_manifest_construction_error
     ["word", ". abc def", ". #{random_block} 0:", ". / !"].each do |m_text|
       assert_raises(ArgumentError,
@@ -145,12 +172,12 @@ class CollectionTest < Minitest::Test
     test_normalization_file_spans_two_whole_blocks("2:3:f1 2:3:f1", 1)
   end
 
-  def test_normalization_dedups_locators
+  def test_normalization_handles_duplicate_locator
     blocks = random_blocks(2, 5)
     coll = Arv::Collection.new(". %s %s 1:8:f1 11:8:f1\n" %
                                [blocks.join(" "), blocks.reverse.join(" ")])
     coll.normalize
-    assert_equal(". #{blocks.join(' ')} 1:8:f1 6:4:f1 0:4:f1\n",
+    assert_equal(". #{blocks.join(' ')} #{blocks[0]} 1:8:f1 6:8:f1\n",
                  coll.manifest_text)
   end
 
@@ -395,6 +422,24 @@ class CollectionTest < Minitest::Test
                  dst_coll.manifest_text)
   end
 
+  def test_copy_with_repeated_blocks
+    blocks = abcde_blocks
+    src_coll = Arv::Collection.new(". #{blocks[0]} #{blocks[1]} #{blocks[2]} #{blocks[0]} #{blocks[1]} #{blocks[2]} #{blocks[3]} #{blocks[4]} 27:27:f1\n")
+    dst_coll = Arv::Collection.new()
+    dst_coll.cp_r("f1", "./", src_coll)
+    assert_equal(". #{blocks[0]} #{blocks[1]} #{blocks[2]} 0:27:f1\n", dst_coll.manifest_text, "mangled by cp_r")
+  end
+
+  def test_copy_with_repeated_split_blocks
+    blocks = abcde_blocks
+    src_coll = Arv::Collection.new(". #{blocks[0]} #{blocks[1]} #{blocks[2]} #{blocks[0]} #{blocks[1]} #{blocks[2]} #{blocks[3]} #{blocks[4]} 20:27:f1\n")
+    dst_coll = Arv::Collection.new()
+    src_coll.normalize
+    assert_equal(". #{blocks[2]} #{blocks[0]} #{blocks[1]} #{blocks[2]} 2:27:f1\n", src_coll.manifest_text, "mangled by normalize()")
+    dst_coll.cp_r("f1", "./", src_coll)
+    assert_equal(". #{blocks[2]} #{blocks[0]} #{blocks[1]} #{blocks[2]} 2:27:f1\n", dst_coll.manifest_text, "mangled by cp_r")
+  end
+
   def test_copy_empty_source_path_raises_ArgumentError(src="", dst="./s1")
     coll = Arv::Collection.new(SIMPLEST_MANIFEST)
     assert_raises(ArgumentError) do
index 804d2a479d3d4701489c8d1bed812c7a6873c252..f166505bc07ea82703fb8673c919119003cc20c2 100644 (file)
@@ -25,9 +25,6 @@ group :test, :development do
   gem 'byebug'
 end
 
-# We need this dependency because of crunchv1
-gem 'arvados-cli'
-
 gem 'pg', '~> 1.0'
 
 gem 'multi_json'
@@ -58,7 +55,12 @@ gem 'faye-websocket'
 
 gem 'themes_for_rails', git: 'https://github.com/curoverse/themes_for_rails'
 
-gem 'arvados', '>= 1.3.1.20190301212059'
+# We need arvados-cli because of crunchv1. Note: bundler can't handle
+# two gems with the same "git" url but different "glob" values, hence
+# the use of a wildcard here instead of literal paths
+# (sdk/cli/arvados-cli.gem and sdk/ruby/arvados.gem).
+gem 'arvados-cli', git: 'https://github.com/curoverse/arvados.git', glob: 'sdk/*/*.gemspec'
+gem 'arvados', git: 'https://github.com/curoverse/arvados.git', glob: 'sdk/*/*.gemspec'
 gem 'httpclient'
 
 gem 'sshkey'
@@ -73,6 +75,10 @@ gem 'rails-controller-testing'
 
 gem 'sass-rails'
 
+# arvados-google-api-client and googleauth depend on signet, but
+# signet 0.12 is incompatible with ruby 2.3.
+gem 'signet', '< 0.12'
+
 # Install any plugin gems
 Dir.glob(File.join(File.dirname(__FILE__), 'lib', '**', "Gemfile")) do |f|
     eval(IO.read(f), binding)
index 078b2b7f418d1e94ca2b4ab424be702cdad1197b..88fcdcfed02ca2e61a44e8c55d64c5d1507cd71a 100644 (file)
@@ -1,3 +1,27 @@
+GIT
+  remote: https://github.com/curoverse/arvados.git
+  revision: dd9f2403f43bcb93da5908ddde57d8c0491bb4c2
+  glob: sdk/*/*.gemspec
+  specs:
+    arvados (1.4.1.20191019025325)
+      activesupport (>= 3)
+      andand (~> 1.3, >= 1.3.3)
+      arvados-google-api-client (>= 0.7, < 0.8.9)
+      faraday (< 0.16)
+      i18n (~> 0)
+      json (>= 1.7.7, < 3)
+      jwt (>= 0.1.5, < 2)
+    arvados-cli (1.4.1.20191017145711)
+      activesupport (>= 3.2.13, < 5.1)
+      andand (~> 1.3, >= 1.3.3)
+      arvados (>= 1.4.1.20190320201707)
+      arvados-google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
+      curb (~> 0.8)
+      faraday (< 0.16)
+      json (>= 1.7.7, < 3)
+      oj (~> 3.0)
+      optimist (~> 3.0)
+
 GIT
   remote: https://github.com/curoverse/themes_for_rails
   revision: ddf6e592b3b6493ea0c2de7b5d3faa120ed35be0
@@ -49,27 +73,11 @@ GEM
       activemodel (>= 3.0.0)
       activesupport (>= 3.0.0)
       rack (>= 1.1.0)
-    addressable (2.6.0)
-      public_suffix (>= 2.0.2, < 4.0)
+    addressable (2.7.0)
+      public_suffix (>= 2.0.2, < 5.0)
     andand (1.3.3)
     arel (7.1.4)
-    arvados (1.3.1.20190320201707)
-      activesupport (>= 3)
-      andand (~> 1.3, >= 1.3.3)
-      arvados-google-api-client (>= 0.7, < 0.8.9)
-      i18n (~> 0)
-      json (>= 1.7.7, < 3)
-      jwt (>= 0.1.5, < 2)
-    arvados-cli (1.3.1.20190320201707)
-      activesupport (>= 3.2.13, < 5.1)
-      andand (~> 1.3, >= 1.3.3)
-      arvados (~> 1.3.0, >= 1.3.0)
-      arvados-google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
-      curb (~> 0.8)
-      json (>= 1.7.7, < 3)
-      oj (~> 3.0)
-      optimist (~> 3.0)
-    arvados-google-api-client (0.8.7.2)
+    arvados-google-api-client (0.8.7.3)
       activesupport (>= 3.2, < 5.1)
       addressable (~> 2.3)
       autoparse (~> 0.3)
@@ -94,7 +102,7 @@ GEM
       net-ssh-gateway (>= 1.1.0)
     concurrent-ruby (1.1.5)
     crass (1.0.4)
-    curb (0.9.9)
+    curb (0.9.10)
     database_cleaner (1.7.0)
     erubis (2.7.0)
     eventmachine (1.2.7)
@@ -113,7 +121,7 @@ GEM
     ffi (1.9.25)
     globalid (0.4.2)
       activesupport (>= 4.2.0)
-    googleauth (0.8.0)
+    googleauth (0.9.0)
       faraday (~> 0.12)
       jwt (>= 1.4, < 3.0)
       memoist (~> 0.16)
@@ -153,9 +161,9 @@ GEM
     minitest (5.10.3)
     mocha (1.8.0)
       metaclass (~> 0.0.1)
-    multi_json (1.13.1)
+    multi_json (1.14.1)
     multi_xml (0.6.0)
-    multipart-post (2.0.0)
+    multipart-post (2.1.1)
     net-scp (2.0.0)
       net-ssh (>= 2.6.5, < 6.0.0)
     net-sftp (2.1.2)
@@ -172,7 +180,7 @@ GEM
       multi_json (~> 1.3)
       multi_xml (~> 0.5)
       rack (>= 1.2, < 3)
-    oj (3.7.11)
+    oj (3.9.2)
     omniauth (1.4.3)
       hashie (>= 1.2, < 4)
       rack (>= 1.6.2, < 3)
@@ -180,13 +188,13 @@ GEM
       oauth2 (~> 1.1)
       omniauth (~> 1.2)
     optimist (3.0.0)
-    os (1.0.0)
+    os (1.0.1)
     passenger (6.0.2)
       rack
       rake (>= 0.8.1)
     pg (1.1.4)
     power_assert (1.1.4)
-    public_suffix (3.0.3)
+    public_suffix (4.0.1)
     rack (2.0.7)
     rack-test (0.6.3)
       rack (>= 1.0)
@@ -288,8 +296,8 @@ PLATFORMS
 DEPENDENCIES
   acts_as_api
   andand
-  arvados (>= 1.3.1.20190301212059)
-  arvados-cli
+  arvados!
+  arvados-cli!
   byebug
   database_cleaner
   factory_bot_rails
@@ -316,6 +324,7 @@ DEPENDENCIES
   rvm-capistrano
   safe_yaml
   sass-rails
+  signet (< 0.12)
   simplecov (~> 0.7.1)
   simplecov-rcov
   sshkey
index 2bbdd0a07f45508a3515e8384fb9bca7e05a6817..97f2cf1c745896800a0c2d44a37d2a5e7c6ef366 100644 (file)
@@ -426,6 +426,10 @@ class Container < ArvadosModel
     current_user.andand.is_admin
   end
 
+  def permission_to_destroy
+    current_user.andand.is_admin
+  end
+
   def ensure_owner_uuid_is_permitted
     # validate_change ensures owner_uuid can't be changed at all --
     # except during create, which requires admin privileges. Checking
index 88fd5feb6ad27c3c55dd5531c0a2566422239f41..5f17efc4452c3ac24e5e53f9d532da1ce3b9d673 100644 (file)
@@ -980,6 +980,15 @@ class ContainerTest < ActiveSupport::TestCase
     end
   end
 
+  test "user cannot delete" do
+    set_user_from_auth :active
+    c, _ = minimal_new
+    assert_raises ArvadosModel::PermissionDeniedError do
+      c.destroy
+    end
+    assert Container.find_by_uuid(c.uuid)
+  end
+
   [
     {state: Container::Complete, exit_code: 0, output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'},
     {state: Container::Cancelled},
index 3f529f6313b9ee55483efaa21367572bd54207a4..f6a64a6217f1f9c80c8e90a3756f5238fd796f06 100644 (file)
@@ -15,6 +15,7 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
@@ -82,33 +83,49 @@ func (cp *copier) Copy() (string, error) {
                        return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var unflushed int64
+       var lastparentdir string
        for _, f := range cp.files {
-               err = cp.copyFile(fs, f)
+               // If a dir has just had its last file added, do a
+               // full Flush. Otherwise, do a partial Flush (write
+               // full-size blocks, but leave the last short block
+               // open so f's data can be packed with it).
+               dir, _ := filepath.Split(f.dst)
+               if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+                       if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+                               return "", fmt.Errorf("error flushing output collection file data: %v", err)
+                       }
+                       unflushed = 0
+               }
+               lastparentdir = dir
+
+               n, err := cp.copyFile(fs, f)
                if err != nil {
                        return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
                }
+               unflushed += n
        }
        return fs.MarshalManifest(".")
 }
 
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
        cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
        dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
-               return err
+               return 0, err
        }
        src, err := os.Open(f.src)
        if err != nil {
                dst.Close()
-               return err
+               return 0, err
        }
        defer src.Close()
-       _, err = io.Copy(dst, src)
+       n, err := io.Copy(dst, src)
        if err != nil {
                dst.Close()
-               return err
+               return n, err
        }
-       return dst.Close()
+       return n, dst.Close()
 }
 
 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an