<table class="table table-condensed table-fixedlayout arv-recent-container-requests">
<colgroup>
<col width="10%" />
- <col width="15%" />
- <col width="25%" />
+ <col width="20%" />
+ <col width="20%" />
<col width="15%" />
<col width="15%" />
<col width="15%" />
<th>
Status
</th><th>
- Container request
+ Name
</th><th>
Description
</th><th>
<td>
<span class="label label-<%= wu.state_bootstrap_class %>"><%= wu.state_label %></span>
</td><td>
- <%= link_to_if_arvados_object obj, friendly_name: true %>
+ <%= link_to_if_arvados_object obj, friendly_name: true, link_text: if !obj.name.empty? then obj.name else obj.uuid end %>
</td><td>
<%= obj.description || '' %>
</td><td>
<%
recent_procs = recent_processes(12)
+ # preload container_uuids of any container requests
+ recent_crs = recent_procs.map {|p| p if p.is_a?(ContainerRequest)}.compact.uniq
+ recent_cr_containers = recent_crs.map {|cr| cr.container_uuid}.compact.uniq
+ preload_objects_for_dataclass(Container, recent_cr_containers) if recent_cr_containers.andand.any?
+
+ # fetch children of all the active crs in one call, if there are any
+ active_crs = recent_crs.each {|cr| cr if (cr.priority > 0 and cr.state != 'Final' and cr.container_uuid)}
+ active_cr_uuids = active_crs.map(&:uuid)
+ active_cr_containers = active_crs.map {|cr| cr.container_uuid}.compact.uniq
+ cr_children = {}
+ if active_cr_containers.any?
+ active_cr_containers.each { |c| cr_children[c] = []}
+ cols = ContainerRequest.columns.map(&:name) - %w(id updated_at mounts)
+ reqs = ContainerRequest.select(cols).where(requesting_container_uuid: active_cr_containers).results
+ reqs.each {|cr| cr_children[cr.requesting_container_uuid] << cr} if reqs
+ end
+
wus = {}
outputs = []
recent_procs.each do |p|
- wu = p.work_unit
+ if p.uuid.in?(active_cr_uuids)
+ wu = p.work_unit(nil, child_objects=cr_children[p.container_uuid])
+ else
+ wu = p.work_unit
+ end
+
wus[p] = wu
outputs << wu.outputs
end
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170510165748',
+ 'cwltool==1.0.20170525215327',
'schema-salad==2.5.20170428142041',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
else:
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
finally:
return
new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
files = []
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
files.append((bb, new_bb.write_pointer - bb.size()))
- self.commit_bufferblock(new_bb, sync=True)
+ self.commit_bufferblock(new_bb, sync=sync)
for bb, new_bb_segment_offset in files:
newsegs = bb.owner.segments()
for s in newsegs:
if s.locator == bb.blockid:
- s.locator = new_bb.locator()
+ s.locator = new_bb.blockid
s.segment_offset = new_bb_segment_offset+s.segment_offset
bb.owner.set_segments(newsegs)
self._delete_bufferblock(bb.blockid)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
- v.owner.flush(sync=False)
+ # Ignore blocks with a list of owners, as if they're not in COMMITTED
+ # state, they're already being committed asynchronously.
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=False)
with self.lock:
if self._put_queue is not None:
# flush again with sync=True to remove committed bufferblocks from
# the segments.
if v.owner:
- v.owner.flush(sync=True)
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=True)
+ elif isinstance(v.owner, list) and len(v.owner) > 0:
+ # This bufferblock is referenced by many files as a result
+ # of repacking small blocks, so don't delete it when flushing
+ # its owners, just do it after flushing them all.
+ for owner in v.owner:
+ owner.flush(sync=True)
+ self.delete_bufferblock(k)
def block_prefetch(self, locator):
"""Initiate a background download of a block.
to_delete.add(s.locator)
s.locator = bb.locator()
for s in to_delete:
- self.parent._my_block_manager().delete_bufferblock(s)
+ # Don't delete the bufferblock if it's owned by many files. It'll be
+ # deleted after all of its owners are flush()ed.
+ if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+ self.parent._my_block_manager().delete_bufferblock(s)
self.parent.notify(MOD, self.parent, self.name, (self, self))
mockkeep = mock.MagicMock()
with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner = mock.MagicMock()
+ bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
def flush(sync=None):
blockmanager.commit_bufferblock(bufferblock, sync)
bufferblock.owner.flush.side_effect = flush
mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner = mock.MagicMock()
+ bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
def flush(sync=None):
blockmanager.commit_bufferblock(bufferblock, sync)
bufferblock.owner.flush.side_effect = flush
def setUp(self):
self.keep_put = getattr(arvados.keep.KeepClient, 'put')
- def test_repacked_block_sumbmission_get_permission_token(self):
+ def test_repacked_block_submission_get_permission_token(self):
'''
Make sure that those blocks that are committed after repacking small ones,
get their permission tokens assigned on the collection manifest.
def check_orphaned_slurm_jobs
act_as_system_user do
- squeue_uuids = squeue_jobs.select{|uuid| uuid.match(HasUuid::UUID_REGEX)}.
+ squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
select{|uuid| !@running.has_key?(uuid)}
return if squeue_uuids.size == 0
act_as_system_user do
dispatch = CrunchDispatch.new
- squeue_resp = IO.popen("echo zzzzz-8i9sb-pshmckwoma9plh7\necho thisisnotvalidjobuuid\necho zzzzz-8i9sb-4cf0abc123e809j\n")
+ squeue_resp = IO.popen("echo zzzzz-8i9sb-pshmckwoma9plh7\necho thisisnotvalidjobuuid\necho zzzzz-8i9sb-4cf0abc123e809j\necho zzzzz-dz642-o04e3r651turtdr\n")
scancel_resp = IO.popen("true")
IO.expects(:popen).
def _send_request(self):
# cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
queuelist = []
for out in squeue_out.splitlines():
- cpu, ram, disk, reason, jobname = out.split(" ", 4)
- if reason in ("Resources", "ReqNodeNotAvail"):
+ cpu, ram, disk, reason, jobname = out.split("|", 4)
+ if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
queuelist.append({
"uuid": jobname,
"runtime_constraints": {
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
+2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1 1024M 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1 2G 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
+1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(