Merge branch '9005-keep-http-client'
authorTom Clegg <tom@curoverse.com>
Wed, 31 May 2017 20:13:50 +0000 (16:13 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 31 May 2017 20:13:50 +0000 (16:13 -0400)
refs #9005

apps/workbench/app/views/container_requests/_show_recent.html.erb
apps/workbench/app/views/container_requests/_show_recent_rows.html.erb
apps/workbench/app/views/projects/_show_dashboard.html.erb
sdk/cwl/setup.py
sdk/python/arvados/arvfile.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
services/api/lib/crunch_dispatch.rb
services/api/test/unit/crunch_dispatch_test.rb
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/tests/test_jobqueue.py

index 6a4c8b18ecc97e30d1c931da18ccdf6020925d14..7ee20db7c5e22298cbfb8016adedb86b7565850f 100644 (file)
@@ -3,8 +3,8 @@
 <table class="table table-condensed table-fixedlayout arv-recent-container-requests">
   <colgroup>
     <col width="10%" />
-    <col width="15%" />
-    <col width="25%" />
+    <col width="20%" />
+    <col width="20%" />
     <col width="15%" />
     <col width="15%" />
     <col width="15%" />
@@ -15,7 +15,7 @@
       <th>
         Status
       </th><th>
-        Container request
+        Name
       </th><th>
         Description
       </th><th>
index d11bf35c21bc7c61f9da22cd28f3d69ef7449fff..6caf6d563002d5461320dc8cf45515ad0c79f1a9 100644 (file)
@@ -20,7 +20,7 @@
     <td>
       <span class="label label-<%= wu.state_bootstrap_class %>"><%= wu.state_label %></span>
     </td><td>
-      <%= link_to_if_arvados_object obj, friendly_name: true %>
+      <%= link_to_if_arvados_object obj, friendly_name: true, link_text: if !obj.name.empty? then obj.name else obj.uuid end %>
     </td><td>
       <%= obj.description || '' %>
     </td><td>
index bcfeb6a466e544926dc4a8c7555dcbcd0cdd4ab7..f472b1e75afefd711b9457ea8223dca20a95ee44 100644 (file)
@@ -1,10 +1,32 @@
 <%
   recent_procs = recent_processes(12)
 
+  # preload container_uuids of any container requests
+  recent_crs = recent_procs.map {|p| p if p.is_a?(ContainerRequest)}.compact.uniq
+  recent_cr_containers = recent_crs.map {|cr| cr.container_uuid}.compact.uniq
+  preload_objects_for_dataclass(Container, recent_cr_containers) if recent_cr_containers.andand.any?
+
+  # fetch children of all the active crs in one call, if there are any
+  active_crs = recent_crs.each {|cr| cr if (cr.priority > 0 and cr.state != 'Final' and cr.container_uuid)}
+  active_cr_uuids = active_crs.map(&:uuid)
+  active_cr_containers = active_crs.map {|cr| cr.container_uuid}.compact.uniq
+  cr_children = {}
+  if active_cr_containers.any?
+    active_cr_containers.each { |c| cr_children[c] = []}
+    cols = ContainerRequest.columns.map(&:name) - %w(id updated_at mounts)
+    reqs = ContainerRequest.select(cols).where(requesting_container_uuid: active_cr_containers).results
+    reqs.each {|cr| cr_children[cr.requesting_container_uuid] << cr} if reqs
+  end
+
   wus = {}
   outputs = []
   recent_procs.each do |p|
-    wu = p.work_unit
+    if p.uuid.in?(active_cr_uuids)
+      wu = p.work_unit(nil, child_objects=cr_children[p.container_uuid])
+    else
+      wu = p.work_unit
+    end
+
     wus[p] = wu
     outputs << wu.outputs
   end
index df988f69918e337f8a2a1e91346bbf35dc8a1878..6633314042624d932ca5989e126ff19becf434f3 100644 (file)
@@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170510165748',
+          'cwltool==1.0.20170525215327',
           'schema-salad==2.5.20170428142041',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
index 9ba43490e73a455dd1193590e4ec1043fef31f10..a1d87a498ef81304ad6266793292263f8ff4479c 100644 (file)
@@ -557,7 +557,6 @@ class _BlockManager(object):
                 else:
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
             finally:
@@ -673,20 +672,22 @@ class _BlockManager(object):
             return
 
         new_bb = self._alloc_bufferblock()
+        new_bb.owner = []
         files = []
         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
             bb = small_blocks.pop(0)
+            new_bb.owner.append(bb.owner)
             self._pending_write_size -= bb.size()
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
 
-        self.commit_bufferblock(new_bb, sync=True)
+        self.commit_bufferblock(new_bb, sync=sync)
 
         for bb, new_bb_segment_offset in files:
             newsegs = bb.owner.segments()
             for s in newsegs:
                 if s.locator == bb.blockid:
-                    s.locator = new_bb.locator()
+                    s.locator = new_bb.blockid
                     s.segment_offset = new_bb_segment_offset+s.segment_offset
             bb.owner.set_segments(newsegs)
             self._delete_bufferblock(bb.blockid)
@@ -796,7 +797,10 @@ class _BlockManager(object):
 
         for k,v in items:
             if v.state() != _BufferBlock.COMMITTED and v.owner:
-                v.owner.flush(sync=False)
+                # Ignore blocks with a list of owners, as if they're not in COMMITTED
+                # state, they're already being committed asynchronously.
+                if isinstance(v.owner, ArvadosFile):
+                    v.owner.flush(sync=False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -813,7 +817,15 @@ class _BlockManager(object):
             # flush again with sync=True to remove committed bufferblocks from
             # the segments.
             if v.owner:
-                v.owner.flush(sync=True)
+                if isinstance(v.owner, ArvadosFile):
+                    v.owner.flush(sync=True)
+                elif isinstance(v.owner, list) and len(v.owner) > 0:
+                    # This bufferblock is referenced by many files as a result
+                    # of repacking small blocks, so don't delete it when flushing
+                    # its owners, just do it after flushing them all.
+                    for owner in v.owner:
+                        owner.flush(sync=True)
+                    self.delete_bufferblock(k)
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
@@ -1144,7 +1156,10 @@ class ArvadosFile(object):
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
             for s in to_delete:
-               self.parent._my_block_manager().delete_bufferblock(s)
+                # Don't delete the bufferblock if it's owned by many files. It'll be
+                # deleted after all of its owners are flush()ed.
+                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+                    self.parent._my_block_manager().delete_bufferblock(s)
 
         self.parent.notify(MOD, self.parent, self.name, (self, self))
 
index a8009130551fe3b0ecf0d74aa3ebfe834eb59ad6..d241f73b0c8bbcbc9b37bb38421c0c485e95f7ae 100644 (file)
@@ -832,7 +832,7 @@ class BlockManagerTest(unittest.TestCase):
         mockkeep = mock.MagicMock()
         with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
             bufferblock = blockmanager.alloc_bufferblock()
-            bufferblock.owner = mock.MagicMock()
+            bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
             def flush(sync=None):
                 blockmanager.commit_bufferblock(bufferblock, sync)
             bufferblock.owner.flush.side_effect = flush
@@ -863,7 +863,7 @@ class BlockManagerTest(unittest.TestCase):
         mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
         with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
             bufferblock = blockmanager.alloc_bufferblock()
-            bufferblock.owner = mock.MagicMock()
+            bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
             def flush(sync=None):
                 blockmanager.commit_bufferblock(bufferblock, sync)
             bufferblock.owner.flush.side_effect = flush
index cfc3665f424e32455137af73055966c4238365c2..a992328d70778099f7693e6fdf76158db0cfb6a4 100644 (file)
@@ -1182,7 +1182,7 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
     def setUp(self):
         self.keep_put = getattr(arvados.keep.KeepClient, 'put')
 
-    def test_repacked_block_sumbmission_get_permission_token(self):
+    def test_repacked_block_submission_get_permission_token(self):
         '''
         Make sure that those blocks that are committed after repacking small ones,
         get their permission tokens assigned on the collection manifest.
index 2ae99f01c59b7bef384e4e48011357a37f3363d3..3ef1803031e8ecfc9ad2da6202f6bd3547f5dd08 100644 (file)
@@ -892,7 +892,7 @@ class CrunchDispatch
 
   def check_orphaned_slurm_jobs
     act_as_system_user do
-      squeue_uuids = squeue_jobs.select{|uuid| uuid.match(HasUuid::UUID_REGEX)}.
+      squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
                                   select{|uuid| !@running.has_key?(uuid)}
 
       return if squeue_uuids.size == 0
index d091847df2a20a19bbf08d55a5a9c8e511b8d7b9..6107205494d1a19159d108cdcb5a55df0e3807f3 100644 (file)
@@ -208,7 +208,7 @@ class CrunchDispatchTest < ActiveSupport::TestCase
     act_as_system_user do
       dispatch = CrunchDispatch.new
 
-      squeue_resp = IO.popen("echo zzzzz-8i9sb-pshmckwoma9plh7\necho thisisnotvalidjobuuid\necho zzzzz-8i9sb-4cf0abc123e809j\n")
+      squeue_resp = IO.popen("echo zzzzz-8i9sb-pshmckwoma9plh7\necho thisisnotvalidjobuuid\necho zzzzz-8i9sb-4cf0abc123e809j\necho zzzzz-dz642-o04e3r651turtdr\n")
       scancel_resp = IO.popen("true")
 
       IO.expects(:popen).
index a35bd92863e533ca2abb7f4f18ca12f992e835a1..0340918e736549403d99469bad52f9f9c4a93154 100644 (file)
@@ -133,11 +133,11 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     def _send_request(self):
         # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
+        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
         queuelist = []
         for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split(" ", 4)
-            if reason in ("Resources", "ReqNodeNotAvail"):
+            cpu, ram, disk, reason, jobname = out.split("|", 4)
+            if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
                 queuelist.append({
                     "uuid": jobname,
                     "runtime_constraints": {
index fced67ce13442624b02e7fb21d1d8c16e13df946..554ff1eb79724d48dd8e584f490420bdcac91579 100644 (file)
@@ -150,8 +150,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list(self, mock_squeue):
-        mock_squeue.return_value = """1 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
+2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -163,8 +163,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list_suffix(self, mock_squeue):
-        mock_squeue.return_value = """1 1024M 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1 2G 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
+1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(