therubyracer
uglifier (>= 1.0.3)
wiselinks
-
-BUNDLED WITH
- 1.10.6
end
end
+ # star / unstar the current project
+ def star
+ links = Link.where(tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star')
+
+ if params['status'] == 'create'
+ # create 'star' link if one does not already exist
+ if !links.andand.any?
+ dst = Link.new(owner_uuid: current_user.uuid,
+ tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star',
+ name: @object.uuid)
+ dst.save!
+ end
+ else # delete any existing 'star' links
+ if links.andand.any?
+ links.each do |link|
+ link.destroy
+ end
+ end
+ end
+
+ respond_to do |format|
+ format.js
+ end
+ end
+
protected
def derive_unique_filename filename, manifest_files
# exception here than in a template.)
unless current_user.nil?
begin
- build_project_trees
+ build_my_wanted_projects_tree current_user
rescue ArvadosApiClient::ApiError
# Fall back to the default-setting code later.
end
end
+ @starred_projects ||= []
+ @my_wanted_projects_tree ||= []
@my_project_tree ||= []
@shared_project_tree ||= []
render_error(err_opts)
end
end
+ helper_method :is_starred
+ def is_starred
+ links = Link.where(tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star')
+
+ return links.andand.any?
+ end
+
protected
helper_method :strip_token_from_path
{collections: c, owners: own}
end
+ helper_method :my_starred_projects
+ def my_starred_projects user
+ return if @starred_projects
+ links = Link.filter([['tail_uuid', '=', user.uuid],
+ ['link_class', '=', 'star'],
+ ['head_uuid', 'is_a', 'arvados#group']]).select(%w(head_uuid))
+ uuids =links.collect { |x| x.head_uuid }
+ starred_projects = Group.filter([['uuid', 'in', uuids]]).order('name')
+ @starred_projects = starred_projects.results
+ end
+
+ # If there are more than 200 projects that are readable by the user,
+ # build the tree using only the top 200+ projects owned by the user,
+ # from the top three levels.
+ # That is: get toplevel projects under home, get subprojects of
+ # these projects, and so on until we hit the limit.
+ def my_wanted_projects user, page_size=100
+ return @my_wanted_projects if @my_wanted_projects
+
+ from_top = []
+ uuids = [user.uuid]
+ depth = 0
+ @too_many_projects = false
+ @reached_level_limit = false
+ while from_top.size <= page_size*2
+ current_level = Group.filter([['group_class','=','project'],
+ ['owner_uuid', 'in', uuids]])
+ .order('name').limit(page_size*2)
+ break if current_level.results.size == 0
+ @too_many_projects = true if current_level.items_available > current_level.results.size
+ from_top.concat current_level.results
+ uuids = current_level.results.collect { |x| x.uuid }
+ depth += 1
+ if depth >= 3
+ @reached_level_limit = true
+ break
+ end
+ end
+ @my_wanted_projects = from_top
+ end
+
+ helper_method :my_wanted_projects_tree
+ def my_wanted_projects_tree user, page_size=100
+ build_my_wanted_projects_tree user, page_size
+ [@my_wanted_projects_tree, @too_many_projects, @reached_level_limit]
+ end
+
+ def build_my_wanted_projects_tree user, page_size=100
+ return @my_wanted_projects_tree if @my_wanted_projects_tree
+
+ parent_of = {user.uuid => 'me'}
+ my_wanted_projects(user, page_size).each do |ob|
+ parent_of[ob.uuid] = ob.owner_uuid
+ end
+ children_of = {false => [], 'me' => [user]}
+ my_wanted_projects(user, page_size).each do |ob|
+ if ob.owner_uuid != user.uuid and
+ not parent_of.has_key? ob.owner_uuid
+ parent_of[ob.uuid] = false
+ end
+ children_of[parent_of[ob.uuid]] ||= []
+ children_of[parent_of[ob.uuid]] << ob
+ end
+ buildtree = lambda do |children_of, root_uuid=false|
+ tree = {}
+ children_of[root_uuid].andand.each do |ob|
+ tree[ob] = buildtree.call(children_of, ob.uuid)
+ end
+ tree
+ end
+ sorted_paths = lambda do |tree, depth=0|
+ paths = []
+ tree.keys.sort_by { |ob|
+ ob.is_a?(String) ? ob : ob.friendly_link_name
+ }.each do |ob|
+ paths << {object: ob, depth: depth}
+ paths += sorted_paths.call tree[ob], depth+1
+ end
+ paths
+ end
+ @my_wanted_projects_tree =
+ sorted_paths.call buildtree.call(children_of, 'me')
+ end
+
helper_method :my_project_tree
def my_project_tree
build_project_trees
+ <nav class="navbar navbar-default breadcrumbs" role="navigation">
+ <ul class="nav navbar-nav navbar-left">
+ <li>
+ <a href="/">
+ <i class="fa fa-lg fa-fw fa-dashboard"></i>
+ Dashboard
+ </a>
+ </li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
+ Projects
+ <span class="caret"></span>
+ </a>
+ <ul class="dropdown-menu" style="min-width: 20em" role="menu">
+ <li role="menuitem">
+ <%= link_to(
+ url_for(
+ action: 'choose',
+ controller: 'search',
+ filters: [['uuid', 'is_a', 'arvados#group']].to_json,
+ title: 'Search',
+ action_name: 'Show',
+ action_href: url_for(controller: :actions, action: :show),
+ action_method: 'get',
+ action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
+ { remote: true, method: 'get', title: "Search" }) do %>
+ <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
+ <% end %>
+ </li>
+ <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
+ <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
+ </li>
+ <% end %>
+ <li role="menuitem">
+ <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
+ <i class="fa fa-fw fa-plus"></i> Add a new project
+ <% end %>
+ </li>
+ <li role="presentation" class="divider"></li>
+ <%= render partial: "projects_tree_menu", locals: {
+ :project_link_to => Proc.new do |pnode, &block|
+ link_to(project_path(pnode[:object].uuid),
+ data: { 'object-uuid' => pnode[:object].uuid,
+ 'name' => 'name' },
+ &block)
+ end,
+ } %>
+ </ul>
+ </li>
+ <% if @name_link or @object %>
+ <li class="nav-separator">
+ <i class="fa fa-lg fa-angle-double-right"></i>
+ </li>
+ <li>
+ <%= link_to project_path(current_user.uuid) do %>
+ Home
+ <% end %>
+ </li>
+ <% project_breadcrumbs.each do |p| %>
+ <li class="nav-separator">
+ <i class="fa fa-lg fa-angle-double-right"></i>
+ </li>
+ <li>
+ <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
+ </li>
+ <% end %>
+ <% end %>
+ </ul>
+ </nav>
+<% starred_projects = my_starred_projects current_user%>
+<% if starred_projects.andand.any? %>
+ <li role="presentation" class="dropdown-header">
+ My favorite projects
+ </li>
+ <li>
+ <%= project_link_to.call({object: current_user, depth: 0}) do %>
+ <span style="padding-left: 0">Home</span>
+ <% end %>
+ </li>
+ <% (starred_projects).each do |pnode| %>
+ <li>
+ <%= project_link_to.call({object: pnode, depth: 0}) do%>
+ <span style="padding-left: 0em"></span><%= pnode[:name] %>
+ <% end %>
+ </li>
+ <% end %>
+ <li role="presentation" class="divider"></li>
+<% end %>
+
<li role="presentation" class="dropdown-header">
My projects
</li>
<span style="padding-left: 0">Home</span>
<% end %>
</li>
-<% my_project_tree.each do |pnode| %>
+<% my_tree = my_wanted_projects_tree current_user %>
+<% my_tree[0].each do |pnode| %>
<% next if pnode[:object].class != Group %>
<li>
<%= project_link_to.call pnode do %>
<% end %>
</li>
<% end %>
+<% if my_tree[1] or my_tree[0].size > 200 %>
+<li role="presentation" class="dropdown-header">
+ Some projects have been omitted.
+</li>
+<% elsif my_tree[2] %>
+<li role="presentation" class="dropdown-header">
+ Showing top three levels of your projects.
+</li>
+<% end %>
--- /dev/null
+<% if current_user and is_starred %>
+ <%= link_to(star_path(status: 'delete', id:@object.uuid, action_method: 'get'), style: "color:#D00", class: "btn btn-xs star-unstar", title: "Remove from list of favorites", remote: true) do %>
+ <i class="fa fa-lg fa-star"></i>
+ <% end %>
+<% else %>
+ <%= link_to(star_path(status: 'create', id:@object.uuid, action_method: 'get'), class: "btn btn-xs star-unstar", title: "Add to list of favorites", remote: true) do %>
+ <i class="fa fa-lg fa-star-o"></i>
+ <% end %>
+<% end %>
--- /dev/null
+$(".star-unstar").html("<%= escape_javascript(render partial: 'show_star') %>");
+$(".breadcrumbs").html("<%= escape_javascript(render partial: 'breadcrumbs') %>");
</nav>
<% if current_user.andand.is_active %>
- <nav class="navbar navbar-default breadcrumbs" role="navigation">
- <ul class="nav navbar-nav navbar-left">
- <li>
- <a href="/">
- <i class="fa fa-lg fa-fw fa-dashboard"></i>
- Dashboard
- </a>
- </li>
- <li class="dropdown">
- <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
- Projects
- <span class="caret"></span>
- </a>
- <ul class="dropdown-menu" style="min-width: 20em" role="menu">
- <li role="menuitem">
- <%= link_to(
- url_for(
- action: 'choose',
- controller: 'search',
- filters: [['uuid', 'is_a', 'arvados#group']].to_json,
- title: 'Search',
- action_name: 'Show',
- action_href: url_for(controller: :actions, action: :show),
- action_method: 'get',
- action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
- { remote: true, method: 'get', title: "Search" }) do %>
- <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
- <% end %>
- </li>
- <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
- <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
- </li>
- <% end %>
- <li role="menuitem">
- <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
- <i class="fa fa-fw fa-plus"></i> Add a new project
- <% end %>
- </li>
- <li role="presentation" class="divider"></li>
- <%= render partial: "projects_tree_menu", locals: {
- :project_link_to => Proc.new do |pnode, &block|
- link_to(project_path(pnode[:object].uuid),
- data: { 'object-uuid' => pnode[:object].uuid,
- 'name' => 'name' },
- &block)
- end,
- } %>
- </ul>
- </li>
- <% if @name_link or @object %>
- <li class="nav-separator">
- <i class="fa fa-lg fa-angle-double-right"></i>
- </li>
- <li>
- <%= link_to project_path(current_user.uuid) do %>
- Home
- <% end %>
- </li>
- <% project_breadcrumbs.each do |p| %>
- <li class="nav-separator">
- <i class="fa fa-lg fa-angle-double-right"></i>
- </li>
- <li>
- <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
- </li>
- <% end %>
- <% end %>
- </ul>
- </nav>
+ <%= render partial: 'breadcrumbs' %>
<% elsif !current_user %> <%# anonymous %>
<% if (@name_link or @object) and (project_breadcrumbs.any?) %>
<nav class="navbar navbar-default breadcrumbs" role="navigation">
<% if @object.uuid == current_user.andand.uuid %>
Home
<% else %>
+ <%= render partial: "show_star" %>
<%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New project" } %>
<% end %>
</h2>
get "users/setup" => 'users#setup', :as => :setup_user
get "report_issue_popup" => 'actions#report_issue_popup', :as => :report_issue_popup
post "report_issue" => 'actions#report_issue', :as => :report_issue
+ get "star" => 'actions#star', :as => :star
resources :nodes
resources :humans
resources :traits
}, session_for(:active)
assert_select "#projects-menu + ul li.divider ~ li a[href=/projects/#{project_uuid}]"
end
+
+ [
+ ["active", 5, ["aproject", "asubproject"], "anonymously_accessible_project"],
+ ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_jobs"],
+ ["admin", 5, ["anonymously_accessible_project", "subproject_in_anonymous_accessible_project"], "aproject"],
+ ].each do |user, page_size, tree_segment, unexpected|
+ test "build my projects tree for #{user} user and verify #{unexpected} is omitted" do
+ use_token user
+ ctrl = ProjectsController.new
+
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+
+ my_tree = ctrl.send :my_wanted_projects_tree, current_user, page_size
+
+ tree_segment_at_depth_1 = api_fixture('groups')[tree_segment[0]]
+ tree_segment_at_depth_2 = api_fixture('groups')[tree_segment[1]] if tree_segment[1]
+
+ tree_nodes = {}
+ my_tree[0].each do |x|
+ tree_nodes[x[:object]['uuid']] = x[:depth]
+ end
+
+ assert_equal(1, tree_nodes[tree_segment_at_depth_1['uuid']])
+ assert_equal(2, tree_nodes[tree_segment_at_depth_2['uuid']]) if tree_segment[1]
+
+ unexpected_project = api_fixture('groups')[unexpected]
+ assert_nil(tree_nodes[unexpected_project['uuid']])
+ end
+ end
+
+ [
+ ["active", 1],
+ ["project_viewer", 1],
+ ["admin", 0],
+ ].each do |user, size|
+ test "starred projects for #{user}" do
+ use_token user
+ ctrl = ProjectsController.new
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(size, my_starred_project.andand.size)
+
+ ctrl2 = ProjectsController.new
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+ my_starred_project = ctrl2.send :my_starred_projects, current_user
+ assert_equal(size, my_starred_project.andand.size)
+ end
+ end
+
+ test "unshare project and verify that it is no longer included in shared user's starred projects" do
+ # remove sharing link
+ use_token :system_user
+ Link.find(api_fixture('links')['share_starred_project_with_project_viewer']['uuid']).destroy
+
+ # verify that project is no longer included in starred projects
+ use_token :project_viewer
+ current_user = User.find(api_fixture('users')['project_viewer']['uuid'])
+ ctrl = ProjectsController.new
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(0, my_starred_project.andand.size)
+
+ # share it again
+ @controller = LinksController.new
+ post :create, {
+ link: {
+ link_class: 'permission',
+ name: 'can_read',
+ head_uuid: api_fixture('groups')['starred_and_shared_active_user_project']['uuid'],
+ tail_uuid: api_fixture('users')['project_viewer']['uuid'],
+ },
+ format: :json
+ }, session_for(:system_user)
+
+ # verify that the project is again included in starred projects
+ use_token :project_viewer
+ ctrl = ProjectsController.new
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(1, my_starred_project.andand.size)
+ end
end
test 'Create a project and move it into a different project' do
visit page_with_token 'active', '/projects'
find("#projects-menu").click
- find(".dropdown-menu a", text: "Home").click
+ within('.dropdown-menu') do
+ first('li', text: 'Home').click
+ end
+ wait_for_ajax
find('.btn', text: "Add a subproject").click
within('h2') do
visit '/projects'
find("#projects-menu").click
- find(".dropdown-menu a", text: "Home").click
+ within('.dropdown-menu') do
+ first('li', text: 'Home').click
+ end
+ wait_for_ajax
find('.btn', text: "Add a subproject").click
within('h2') do
find('.fa-pencil').click
assert page.has_text?('Unrestricted public data'), 'No text - Unrestricted public data'
assert page.has_text?('An anonymously accessible project'), 'No text - An anonymously accessible project'
end
+
+ test "test star and unstar project" do
+ visit page_with_token 'active', "/projects/#{api_fixture('groups')['anonymously_accessible_project']['uuid']}"
+
+ # add to favorites
+ find('.fa-star-o').click
+ wait_for_ajax
+
+ find("#projects-menu").click
+ within('.dropdown-menu') do
+ assert_selector 'li', text: 'Unrestricted public data'
+ end
+
+ # remove from favotires
+ find('.fa-star').click
+ wait_for_ajax
+
+ find("#projects-menu").click
+ within('.dropdown-menu') do
+ assert_no_selector 'li', text: 'Unrestricted public data'
+ end
+ end
end
--- /dev/null
+fpm_args+=(-v 2.0)
def sub_glob(v):
l = glob.glob(v)
if len(l) == 0:
- raise SubstitutionError("$(glob {}) no match fonud".format(v))
+ raise SubstitutionError("$(glob {}) no match found".format(v))
else:
return l[0]
active = 1
pids = set([s.pid for s in subprocesses])
while len(pids) > 0:
- (pid, status) = os.wait()
- pids.discard(pid)
- if not taskp.get("task.ignore_rcode"):
- rcode[pid] = (status >> 8)
+ try:
+ (pid, status) = os.wait()
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ raise
else:
- rcode[pid] = 0
+ pids.discard(pid)
+ if not taskp.get("task.ignore_rcode"):
+ rcode[pid] = (status >> 8)
+ else:
+ rcode[pid] = 0
if sig.sig is not None:
logger.critical("terminating on signal %s" % sig.sig)
# If this job requires a Docker image, install that.
my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
+ Log (undef, "Install docker image $docker_locator");
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{
croak("No Docker image hash found from locator $docker_locator");
}
+ Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
if ($docker_pid == 0)
{
srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
+ ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
exit ($?);
}
while (1)
}
if ($? != 0)
{
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+ Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
+ exit(EX_RETRY_UNLOCKED);
}
# Determine whether this version of Docker supports memory+swap limits.
check_refresh_wanted();
check_squeue();
update_progress_stats();
- select (undef, undef, undef, 0.1);
}
elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $jobstep (values %proc)
+ for my $procinfo (values %proc)
{
+ my $jobstep = $jobstep[$procinfo->{jobstep}];
if ($jobstep->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
return if $silent_procs == 0;
# use killem() on procs whose killtime is reached
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if (exists $jobstep->{killtime}
- && $jobstep->{killtime} <= time
+ my $jobstep = $jobstep[$procinfo->{jobstep}];
+ if (exists $procinfo->{killtime}
+ && $procinfo->{killtime} <= time
&& $jobstep->{stderr_at} < $last_squeue_check)
{
my $sincewhen = "";
if ($jobstep->{stderr_at}) {
$sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
}
- Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
}
# Check for child procs >60s old and not mentioned by squeue.
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if ($jobstep->{time} < time - 60
- && $jobstep->{jobstepname}
- && !exists $ok{$jobstep->{jobstepname}}
- && !exists $jobstep->{killtime})
+ if ($procinfo->{time} < time - 60
+ && $procinfo->{jobstepname}
+ && !exists $ok{$procinfo->{jobstepname}}
+ && !exists $procinfo->{killtime})
{
# According to slurm, this task has ended (successfully or not)
# -- but our srun child hasn't exited. First we must wait (30
# terminated, we'll conclude some slurm communication
# error/delay has caused the task to die without notifying srun,
# and we'll kill srun ourselves.
- $jobstep->{killtime} = time + 30;
- Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+ $procinfo->{killtime} = time + 30;
+ Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
foreach my $job (keys %reader)
{
my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
+ if (0 < sysread ($reader{$job}, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
$jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
+
+ # Consume everything up to the last \n
preprocess_stderr ($job);
+
if (length ($jobstep[$job]->{stderr}) > 16384)
{
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ # If we get a lot of stderr without a newline, chop off the
+ # front to avoid letting our buffer grow indefinitely.
+ substr ($jobstep[$job]->{stderr},
+ 0, length($jobstep[$job]->{stderr}) - 8192) = "";
}
$gotsome = 1;
}
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: (Node failure on|Aborting, io error)/) {
+ elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
my $job_slot_index = $jobstep[$job]->{slotindex};
$slot[$job_slot_index]->{node}->{fail_count}++;
$jobstep[$job]->{tempfail} = 1;
import cwltool.draft2tool
import cwltool.workflow
import cwltool.main
+from cwltool.process import shortname
import threading
import cwltool.docker
import fnmatch
args = [image_name]
if image_tag:
args.append(image_tag)
+ logger.info("Uploading Docker image %s", ":".join(args))
arvados.commands.keepdocker.main(args)
return dockerRequirement["dockerImageId"]
"script_version": "master",
"script_parameters": {"tasks": [script_parameters]},
"runtime_constraints": runtime_constraints
- }, find_or_create=kwargs.get("enable_reuse", True)).execute()
+ }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
self.arvrunner.jobs[response["uuid"]] = self
- logger.info("Job %s is %s", response["uuid"], response["state"])
+ self.arvrunner.pipeline["components"][self.name] = {"job": response}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
+
+ logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
if response["state"] in ("Complete", "Failed", "Cancelled"):
self.done(response)
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
+ def update_pipeline_component(self, record):
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
def done(self, record):
+ try:
+ self.update_pipeline_component(record)
+ except:
+ pass
+
try:
if record["state"] == "Complete":
processStatus = "success"
try:
outputs = {}
- outputs = self.collect_outputs("keep:" + record["output"])
+ if record["output"]:
+ outputs = self.collect_outputs("keep:" + record["output"])
except Exception as e:
logger.exception("Got exception while collecting job outputs:")
processStatus = "permanentFail"
self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
if src not in self._pathmap:
ab = cwltool.pathmapper.abspath(src, basedir)
- st = arvados.commands.run.statfile("", ab)
+ st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
if kwargs.get("conformance_test"):
self._pathmap[src] = (src, ab)
elif isinstance(st, arvados.commands.run.UploadFile):
self.cond = threading.Condition(self.lock)
self.final_output = None
self.uploaded = {}
+ self.num_retries = 4
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
def output_callback(self, out, processStatus):
if processStatus == "success":
logger.info("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+
else:
logger.warn("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_output = out
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
- logger.info("Job %s is Running", event["object_uuid"])
+ uuid = event["object_uuid"]
with self.lock:
- self.jobs[event["object_uuid"]].running = True
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is Running", j.name, uuid)
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
+ uuid = event["object_uuid"]
try:
self.cond.acquire()
- self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
self.fs_access = CollectionFsAccess(input_basedir)
kwargs["fs_access"] = self.fs_access
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20151026181844',
- 'arvados-python-client>=0.1.20151023214338'
+ 'cwltool>=1.0.20160129152024',
+ 'arvados-python-client>=0.1.20160122132348'
],
zip_safe=True,
cmdclass={'egg_info': tagger},
# ArvFile() (file already exists in a collection), UploadFile() (file needs to
# be uploaded to a collection), or simply returns prefix+fn (which yields the
# original parameter string).
-def statfile(prefix, fn):
+def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
absfn = os.path.abspath(fn)
if os.path.exists(absfn):
st = os.stat(absfn)
sp = os.path.split(absfn)
(pdh, branch) = is_in_collection(sp[0], sp[1])
if pdh:
- return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
+ return ArvFile(prefix, fnPattern % (pdh, branch))
else:
# trim leading '/' for path prefix test later
return UploadFile(prefix, absfn[1:])
sp = os.path.split(absfn)
(pdh, branch) = is_in_collection(sp[0], sp[1])
if pdh:
- return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch))
+ return ArvFile(prefix, dirPattern % (pdh, branch))
return prefix+fn
return _subscribe_websocket(api, filters, on_event, last_log_id)
try:
- return _subscribe_websocket(api, filters, on_event, last_log_id)
+ if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
+ return _subscribe_websocket(api, filters, on_event, last_log_id)
+ else:
+ _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
except Exception as e:
_logger.warn("Falling back to polling after websocket error: %s" % e)
p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
'google-api-python-client',
'httplib2',
'pycurl >=7.19.5.1, <7.21.5',
- 'python-gflags',
+ 'python-gflags<3.0',
'ws4py'
],
test_suite='tests',
((attr == 'scopes') and (operator == '=')) ? operand : nil
})
@filters.select! { |attr, operator, operand|
- (attr == 'uuid') and (operator == '=')
+ ((attr == 'uuid') and (operator == '=')) || ((attr == 'api_token') and (operator == '='))
}
end
if @where
end
def find_object_by_uuid
- # Again, to make things easier for the client and our own routing,
- # here we look for the api_token key in a "uuid" (POST) or "id"
- # (GET) parameter.
- @object = model_class.where('api_token=?', params[:uuid] || params[:id]).first
+ @object = model_class.where(uuid: (params[:uuid] || params[:id])).first
end
def current_api_client_is_trusted
unless Thread.current[:api_client].andand.is_trusted
+ if params["action"] == "show"
+ if @object and @object['api_token'] == current_api_client_authorization.andand.api_token
+ return true
+ end
+ elsif params["action"] == "index" and @objects.andand.size == 1
+ filters = @filters.map{|f|f.first}.uniq
+ if ['uuid'] == filters
+ return true if @objects.first['api_token'] == current_api_client_authorization.andand.api_token
+ elsif ['api_token'] == filters
+ return true if @objects.first[:user_id] = current_user.id
+ end
+ end
send_error('Forbidden: this API client cannot manipulate other clients\' access tokens.',
status: 403)
end
class ApiClientAuthorization < ArvadosModel
+ include HasUuid
include KindAndEtag
include CommonApiTemplate
self.user_id_changed?
end
- def uuid
- self.api_token
- end
- def uuid=(x) end
- def uuid_was
- self.api_token_was
- end
- def uuid_changed?
- self.api_token_changed?
- end
-
def modified_by_client_uuid
nil
end
["#{table_name}.modified_at desc", "#{table_name}.uuid"]
end
+ def self.unique_columns
+ ["id", "uuid"]
+ end
+
# If current user can manage the object, return an array of uuids of
# users and groups that have permission to write the object. The
# first two elements are always [self.owner_uuid, current user's
--- /dev/null
+require 'has_uuid'
+
+class AddUuidToApiClientAuthorization < ActiveRecord::Migration
+ extend HasUuid::ClassMethods
+
+ def up
+ add_column :api_client_authorizations, :uuid, :string
+ add_index :api_client_authorizations, :uuid, :unique => true
+
+ prefix = Server::Application.config.uuid_prefix + '-' +
+ Digest::MD5.hexdigest('ApiClientAuthorization'.to_s).to_i(16).to_s(36)[-5..-1] + '-'
+
+ update_sql <<-EOS
+update api_client_authorizations set uuid = (select concat('#{prefix}',
+array_to_string(ARRAY (SELECT substring(api_token FROM (ceil(random()*36))::int FOR 1) FROM generate_series(1, 15)), '')
+));
+EOS
+
+ change_column_null :api_client_authorizations, :uuid, false
+ end
+
+ def down
+ if column_exists?(:api_client_authorizations, :uuid)
+ remove_index :api_client_authorizations, :uuid
+ remove_column :api_client_authorizations, :uuid
+ end
+ end
+end
--- /dev/null
+class AddUuidToApiTokenSearchIndex < ActiveRecord::Migration
+ def up
+ begin
+ remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+ rescue
+ end
+ add_index :api_client_authorizations,
+ ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid", "uuid"],
+ name: "api_client_authorizations_search_index"
+ end
+
+ def down
+ begin
+ remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+ rescue
+ end
+ add_index :api_client_authorizations,
+ ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid"],
+ name: "api_client_authorizations_search_index"
+ end
+end
default_owner_uuid character varying(255),
scopes text DEFAULT '---
- all
-'::text NOT NULL
+'::text NOT NULL,
+ uuid character varying(255) NOT NULL
);
-- Name: api_client_authorizations_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid);
+CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid, uuid);
--
CREATE INDEX index_api_client_authorizations_on_user_id ON api_client_authorizations USING btree (user_id);
+--
+-- Name: index_api_client_authorizations_on_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE UNIQUE INDEX index_api_client_authorizations_on_uuid ON api_client_authorizations USING btree (uuid);
+
+
--
-- Name: index_api_clients_on_created_at; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
INSERT INTO schema_migrations (version) VALUES ('20151215134304');
-INSERT INTO schema_migrations (version) VALUES ('20151229214707');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20151229214707');
+
+INSERT INTO schema_migrations (version) VALUES ('20160208210629');
+
+INSERT INTO schema_migrations (version) VALUES ('20160209155729');
\ No newline at end of file
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status == EXIT_RETRY_UNLOCKED
- # The job failed because all of the nodes allocated to it
- # failed. Only this crunch-dispatch process can retry the job:
+ if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ # Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
# (e.g., [] or ['owner_uuid desc']), fall back on the default
# orders to ensure repeating the same request (possibly with
# different limit/offset) will return records in the same order.
- @orders += model_class.default_orders
+ #
+ # Clean up the resulting list of orders such that no column
+ # uselessly appears twice (Postgres might not optimize this out
+ # for us) and no columns uselessly appear after a unique column
+ # (Postgres does not optimize this out for us; as of 9.2, "order
+ # by id, modified_at desc, uuid" is slow but "order by id" is
+ # fast).
+ orders_given_and_default = @orders + model_class.default_orders
+ order_cols_used = {}
+ @orders = []
+ orders_given_and_default.each do |order|
+ otablecol = order.split(' ')[0]
+
+ next if order_cols_used[otablecol]
+ order_cols_used[otablecol] = true
+
+ @orders << order
+
+ otable, ocol = otablecol.split('.')
+ if otable == table_name and model_class.unique_columns.include?(ocol)
+ # we already have a full ordering; subsequent entries would be
+ # superfluous
+ break
+ end
+ end
case params[:select]
when Array
# Read about fixtures at http://api.rubyonrails.org/classes/ActiveRecord/Fixtures.html
system_user:
+ uuid: zzzzz-gj3su-017z32aux8dg2s1
api_client: untrusted
user: system_user
api_token: systemusertesttoken1234567890aoeuidhtnsqjkxbmwvzpy
expires_at: 2038-01-01 00:00:00
admin:
+ uuid: zzzzz-gj3su-027z32aux8dg2s1
api_client: untrusted
user: admin
api_token: 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
expires_at: 2038-01-01 00:00:00
admin_trustedclient:
+ uuid: zzzzz-gj3su-037z32aux8dg2s1
api_client: trusted_workbench
user: admin
api_token: 1a9ffdcga2o7cw8q12dndskomgs1ygli3ns9k2o9hgzgmktc78
expires_at: 2038-01-01 00:00:00
data_manager:
+ uuid: zzzzz-gj3su-047z32aux8dg2s1
api_client: untrusted
user: system_user
api_token: 320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1
- POST /arvados/v1/logs
miniadmin:
+ uuid: zzzzz-gj3su-057z32aux8dg2s1
api_client: untrusted
user: miniadmin
api_token: 2zb2y9pw3e70270te7oe3ewaantea3adyxjascvkz0zob7q7xb
expires_at: 2038-01-01 00:00:00
rominiadmin:
+ uuid: zzzzz-gj3su-067z32aux8dg2s1
api_client: untrusted
user: rominiadmin
api_token: 5tsb2pc3zlatn1ortl98s2tqsehpby88wmmnzmpsjmzwa6payh
expires_at: 2038-01-01 00:00:00
active:
+ uuid: zzzzz-gj3su-077z32aux8dg2s1
api_client: untrusted
user: active
api_token: 3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
expires_at: 2038-01-01 00:00:00
active_trustedclient:
+ uuid: zzzzz-gj3su-087z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: 27bnddk6x2nmq00a1e3gq43n9tsl5v87a3faqar2ijj8tud5en
expires_at: 2038-01-01 00:00:00
active_noscope:
+ uuid: zzzzz-gj3su-097z32aux8dg2s1
api_client: untrusted
user: active
api_token: activenoscopeabcdefghijklmnopqrstuvwxyz12345678901
scopes: []
project_viewer:
+ uuid: zzzzz-gj3su-107z32aux8dg2s1
api_client: untrusted
user: project_viewer
api_token: projectviewertoken1234567890abcdefghijklmnopqrstuv
expires_at: 2038-01-01 00:00:00
project_viewer_trustedclient:
+ uuid: zzzzz-gj3su-117z32aux8dg2s1
api_client: trusted_workbench
user: project_viewer
api_token: projectviewertrustedtoken1234567890abcdefghijklmno
expires_at: 2038-01-01 00:00:00
subproject_admin:
+ uuid: zzzzz-gj3su-127z32aux8dg2s1
api_client: untrusted
user: subproject_admin
api_token: subprojectadmintoken1234567890abcdefghijklmnopqrst
expires_at: 2038-01-01 00:00:00
admin_vm:
+ uuid: zzzzz-gj3su-137z32aux8dg2s1
api_client: untrusted
user: admin
api_token: adminvirtualmachineabcdefghijklmnopqrstuvwxyz12345
scopes: ["GET /arvados/v1/virtual_machines/zzzzz-2x53u-382brsig8rp3064/logins"]
admin_noscope:
+ uuid: zzzzz-gj3su-147z32aux8dg2s1
api_client: untrusted
user: admin
api_token: adminnoscopeabcdefghijklmnopqrstuvwxyz123456789012
scopes: []
active_all_collections:
+ uuid: zzzzz-gj3su-157z32aux8dg2s1
api_client: untrusted
user: active
api_token: activecollectionsabcdefghijklmnopqrstuvwxyz1234567
scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_services/accessible"]
active_userlist:
+ uuid: zzzzz-gj3su-167z32aux8dg2s1
api_client: untrusted
user: active
api_token: activeuserlistabcdefghijklmnopqrstuvwxyz1234568900
scopes: ["GET /arvados/v1/users"]
active_specimens:
+ uuid: zzzzz-gj3su-177z32aux8dg2s1
api_client: untrusted
user: active
api_token: activespecimensabcdefghijklmnopqrstuvwxyz123456890
scopes: ["GET /arvados/v1/specimens/"]
active_apitokens:
+ uuid: zzzzz-gj3su-187z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: activeapitokensabcdefghijklmnopqrstuvwxyz123456789
"POST /arvados/v1/api_client_authorizations"]
active_readonly:
+ uuid: zzzzz-gj3su-197z32aux8dg2s1
api_client: untrusted
user: active
api_token: activereadonlyabcdefghijklmnopqrstuvwxyz1234568790
scopes: ["GET /"]
spectator:
+ uuid: zzzzz-gj3su-207z32aux8dg2s1
api_client: untrusted
user: spectator
api_token: zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu
expires_at: 2038-01-01 00:00:00
spectator_specimens:
+ uuid: zzzzz-gj3su-217z32aux8dg2s1
api_client: untrusted
user: spectator
api_token: spectatorspecimensabcdefghijklmnopqrstuvwxyz123245
"POST /arvados/v1/specimens"]
inactive:
+ uuid: zzzzz-gj3su-227z32aux8dg2s1
api_client: untrusted
user: inactive
api_token: 5s29oj2hzmcmpq80hx9cta0rl5wuf3xfd6r7disusaptz7h9m0
expires_at: 2038-01-01 00:00:00
inactive_uninvited:
+ uuid: zzzzz-gj3su-237z32aux8dg2s1
api_client: untrusted
user: inactive_uninvited
api_token: 62mhllc0otp78v08e3rpa3nsmf8q8ogk47f7u5z4erp5gpj9al
expires_at: 2038-01-01 00:00:00
inactive_but_signed_user_agreement:
+ uuid: zzzzz-gj3su-247z32aux8dg2s1
api_client: untrusted
user: inactive_but_signed_user_agreement
api_token: 64k3bzw37iwpdlexczj02rw3m333rrb8ydvn2qq99ohv68so5k
expires_at: 2038-01-01 00:00:00
expired:
+ uuid: zzzzz-gj3su-257z32aux8dg2s1
api_client: untrusted
user: active
api_token: 2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx
expires_at: 1970-01-01 00:00:00
expired_trustedclient:
+ uuid: zzzzz-gj3su-267z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: 5hpni7izokzcatku2896xxwqdbt5ptomn04r6auc7fohnli82v
expires_at: 1970-01-01 00:00:00
valid_token_deleted_user:
+ uuid: zzzzz-gj3su-277z32aux8dg2s1
api_client: trusted_workbench
user_id: 1234567
api_token: tewfa58099sndckyqhlgd37za6e47o6h03r9l1vpll23hudm8b
expires_at: 2038-01-01 00:00:00
anonymous:
+ uuid: zzzzz-gj3su-287z32aux8dg2s1
api_client: untrusted
user: anonymous
api_token: 4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi
scopes: ["GET /"]
job_reader:
+ uuid: zzzzz-gj3su-297z32aux8dg2s1
api_client: untrusted
user: job_reader
api_token: e99512cdc0f3415c2428b9758f33bdfb07bc3561b00e86e7e6
expires_at: 2038-01-01 00:00:00
active_no_prefs:
+ uuid: zzzzz-gj3su-307z32aux8dg2s1
api_client: untrusted
user: active_no_prefs
api_token: 3kg612cdc0f3415c2428b9758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_no_prefs_profile_no_getting_started_shown:
+ uuid: zzzzz-gj3su-317z32aux8dg2s1
api_client: untrusted
user: active_no_prefs_profile_no_getting_started_shown
api_token: 3kg612cdc0f3415c242856758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_no_prefs_profile_with_getting_started_shown:
+ uuid: zzzzz-gj3su-327z32aux8dg2s1
api_client: untrusted
user: active_no_prefs_profile_with_getting_started_shown
api_token: 3kg612cdc0f3415c245786758f33bdfb07babcd1b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_with_prefs_profile_no_getting_started_shown:
+ uuid: zzzzz-gj3su-337z32aux8dg2s1
api_client: untrusted
user: active_with_prefs_profile_no_getting_started_shown
api_token: 3kg612cdc0f3415c245786758f33bdfb07befgh1b00e86qdmi
expires_at: 2038-01-01 00:00:00
user_foo_in_sharing_group:
+ uuid: zzzzz-gj3su-347z32aux8dg2s1
api_client: untrusted
user: user_foo_in_sharing_group
api_token: 2p1pou8p4ls208mcbedeewlotghppenobcyrmyhq8pyf51xd8u
expires_at: 2038-01-01 00:00:00
user1_with_load:
+ uuid: zzzzz-gj3su-357z32aux8dg2s1
api_client: untrusted
user: user1_with_load
api_token: 1234k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
expires_at: 2038-01-01 00:00:00
fuse:
+ uuid: zzzzz-gj3su-367z32aux8dg2s1
api_client: untrusted
user: fuse
api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
name: Subproject to test owner uuid and name unique key violation upon removal
description: "Removing this will result in name conflict with 'A project' in Home project and hence get renamed."
group_class: project
+
+starred_and_shared_active_user_project:
+ uuid: zzzzz-j7d0g-starredshared01
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-04-21 15:37:48 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2014-04-21 15:37:48 -0400
+ updated_at: 2014-04-21 15:37:48 -0400
+ name: Starred and shared active user project
+ description: Starred and shared active user project
+ group_class: project
properties: {}
updated_at: 2014-08-06 22:11:51.242010312 Z
+star_project_for_active_user:
+ uuid: zzzzz-o0j2j-starredbyactive
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ link_class: star
+ name: zzzzz-j7d0g-starredshared01
+ head_uuid: zzzzz-j7d0g-starredshared01
+ properties: {}
+
+share_starred_project_with_project_viewer:
+ uuid: zzzzz-o0j2j-sharewithviewer
+ owner_uuid: zzzzz-tpzed-000000000000000
+ tail_uuid: zzzzz-tpzed-projectviewer1a
+ link_class: permission
+ name: can_read
+ head_uuid: zzzzz-j7d0g-starredshared01
+
+star_shared_project_for_project_viewer:
+ uuid: zzzzz-o0j2j-starredbyviewer
+ owner_uuid: zzzzz-tpzed-projectviewer1a
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zzzzz-tpzed-projectviewer1a
+ link_class: star
+ name: zzzzz-j7d0g-starredshared01
+ head_uuid: zzzzz-j7d0g-starredshared01
+ properties: {}
authorize_with :admin_trustedclient
post :create_system_auth, scopes: '["test"]'
assert_response :success
+ assert_not_nil JSON.parse(@response.body)['uuid']
end
test "prohibit create system auth with token from non-trusted client" do
assert_found_tokens(auth, {filters: [['scopes', '=', scopes]]}, *expected)
end
end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't get it by uuid
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} get #{token} token and expect #{status}" do
+ authorize_with user
+ get :show, {id: api_client_authorizations(token).uuid}
+ assert_response status
+ end
+ end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't list it by uuid
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} list #{token} token using uuid and expect #{status}" do
+ authorize_with user
+ get :index, {
+ filters: [['uuid','=',api_client_authorizations(token).uuid]]
+ }
+ assert_response status
+ end
+ end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 200], # this belongs to the user of current session, and can be listed by token
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} list #{token} token using token and expect #{status}" do
+ authorize_with user
+ get :index, {
+ filters: [['api_token','=',api_client_authorizations(token).api_token]]
+ }
+ assert_response status
+ end
+ end
end
'A Project (2)',
"new project name '#{new_project['name']}' was expected to be 'A Project (2)'")
end
+
+ test "unsharing a project results in hiding it from previously shared user" do
+ # remove sharing link for project
+ @controller = Arvados::V1::LinksController.new
+ authorize_with :admin
+ post :destroy, id: links(:share_starred_project_with_project_viewer).uuid
+ assert_response :success
+
+ # verify that the user can no longer see the project
+ @counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :project_viewer
+ get :index, filters: [['group_class', '=', 'project']], format: :json
+ assert_response :success
+ found_projects = {}
+ json_response['items'].each do |g|
+ found_projects[g['uuid']] = g
+ end
+ assert_equal false, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+
+ # share the project
+ @counter = 0
+ @controller = Arvados::V1::LinksController.new
+ authorize_with :system_user
+ post :create, link: {
+ link_class: "permission",
+ name: "can_read",
+ head_uuid: groups(:starred_and_shared_active_user_project).uuid,
+ tail_uuid: users(:project_viewer).uuid,
+ }
+
+ # verify that project_viewer user can now see shared project again
+ @counter = 0
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :project_viewer
+ get :index, filters: [['group_class', '=', 'project']], format: :json
+ assert_response :success
+ found_projects = {}
+ json_response['items'].each do |g|
+ found_projects[g['uuid']] = g
+ end
+ assert_equal true, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+ end
end
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::QueryTest < ActionController::TestCase
+ test 'no fallback orders when order is unambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['id asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal ['logs.id asc'], assigns(:objects).order_values
+ end
+
+ test 'fallback orders when order is ambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'skip fallback orders already given by client' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['modified_at asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate superfluous orders' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['logs.modified_at asc',
+ 'modified_at desc',
+ 'event_type desc',
+ 'logs.event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate orders after the first unique column' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc',
+ 'id asc',
+ 'uuid asc',
+ 'modified_at desc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.id asc',
+ assigns(:objects).order_values.join(', '))
+ end
+end
testPutConcurrent(t, factory)
testPutFullBlock(t, factory)
+
+ testTrashUntrash(t, factory)
}
// Put a test block, get it and verify content
t.Error("rdata != wdata")
}
}
+
+// With trashLifetime != 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - which either raises ErrNotImplemented or succeeds
+// Get - which must succeed
+func testTrashUntrash(t TB, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+ defer func() {
+ trashLifetime = 0
+ }()
+
+ trashLifetime = 3600 * time.Second
+
+ // put block and backdate it
+ v.PutRaw(TestHash, TestBlock)
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+
+ // Trash
+ err = v.Trash(TestHash)
+ if v.Writable() == false {
+ if err != MethodDisabledError {
+ t.Error(err)
+ }
+ } else if err != nil {
+ if err != ErrNotImplemented {
+ t.Error(err)
+ }
+ } else {
+ _, err = v.Get(TestHash)
+ if err == nil || !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
+ }
+
+ // Untrash
+ err = v.Untrash(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Get the block - after trash and untrash sequence
+ buf, err = v.Get(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+}
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._logger = logging.getLogger(self.LOGGER_NAME)
self._later = self.actor_ref.proxy()
self._polling_started = False
- self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
def _start_polling(self):
if not self._polling_started:
self._polling_started = True
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
- self._logger.debug("%r subscribed to all events", subscriber)
+ self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
- self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
- self._logger.debug("%s got response with %d items",
- self.log_prefix, len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- return "{} got error: {} - waiting {} seconds".format(
- self.log_prefix, error, self.poll_wait)
+ return "got error: {} - will try again in {} seconds".format(
+ error, self.poll_wait)
def is_common_error(self, exception):
return False
def poll(self, scheduled_start=None):
- self._logger.debug("%s sending poll", self.log_prefix)
+ self._logger.debug("sending request")
start_time = time.time()
if scheduled_start is None:
scheduled_start = start_time
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
+ self._logger.info("got response with %d items in %s seconds, next poll at %s",
+ len(response), (time.time() - scheduled_start),
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait
if not (isinstance(error, errors) or
self._cloud.is_cloud_exception(error)):
self.retry_wait = self.min_retry_wait
+ self._logger.warning(
+ "Re-raising unknown error (no retry): %s",
+ error, exc_info=error)
raise
self._logger.warning(
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+ def __init__(self, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
- RetryMixin.__init__(self,
- retry_wait,
- max_retry_wait,
- logging.getLogger(logger_name),
- cloud_client,
- timer_actor)
+ RetryMixin.__init__(self, retry_wait, max_retry_wait,
+ None, cloud_client, timer_actor)
self._later = self.actor_ref.proxy()
self._arvados = arvados_client
self.subscribers = set()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+ def on_start(self):
+ self._set_logger()
+
def _finished(self):
_notify_subscribers(self._later, self.subscribers)
self.subscribers = None
+ self._logger.info("finished")
def subscribe(self, subscriber):
if self.subscribers is None:
'last_action': explanation}},
).execute()
+ @staticmethod
+ def _finish_on_exception(orig_func):
+ @functools.wraps(orig_func)
+ def finish_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ self._logger.error("Actor error %s", error)
+ self._finished()
+ return finish_wrapper
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self.cloud_size = cloud_size
self.arvados_node = None
else:
self._later.prepare_arvados_node(arvados_node)
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._clean_arvados_node(
node, "Prepared by Node Manager")
self._later.create_cloud_node()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry()
def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
+ self._logger.info("Sending create_node request for node size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def update_arvados_node_properties(self):
"""Tell Arvados some details about the cloud node.
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
self.cancel_reason = None
self.success = None
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
def on_start(self):
+ super(ComputeNodeShutdownActor, self).on_start()
self._later.shutdown_node()
def _arvados_node(self):
def cancel_shutdown(self, reason):
self.cancel_reason = reason
- self._logger.info("Cloud node %s shutdown cancelled: %s.",
- self.cloud_node.id, reason)
+ self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
- (not self._monitor.shutdown_eligible().get())):
+ (self._monitor.shutdown_eligible().get() is not True)):
self._later.cancel_shutdown(self.WINDOW_CLOSED)
return None
else:
return orig_func(self, *args, **kwargs)
return stop_wrapper
+ @ComputeNodeStateChangeBase._finish_on_exception
@_stop_if_window_closed
@RetryMixin._retry()
def shutdown_node(self):
+ self._logger.info("Starting shutdown")
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._logger.info("Shutdown success")
arv_node = self._arvados_node()
if arv_node is None:
self._finished(success_flag=True)
else:
self._later.clean_arvados_node(arv_node)
+ @ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def clean_arvados_node(self, arvados_node):
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
self.last_shutdown_opening = None
self._later.consider_shutdown()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+ def on_start(self):
+ self._set_logger()
+ self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
def subscribe(self, subscriber):
self.subscribers.add(subscriber)
return result
def shutdown_eligible(self):
+ """Return True if eligible for shutdown, or a string explaining why the node
+ is not eligible for shutdown."""
+
if not self._shutdowns.window_open():
- return False
+ return "shutdown window is not open."
if self.arvados_node is None:
# Node is unpaired.
# If it hasn't pinged Arvados after boot_fail seconds, shut it down
- return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+ if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+ return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+ else:
+ return True
missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
if missing and self._cloud.broken(self.cloud_node):
# Node is paired, but Arvados says it is missing and the cloud says the node
# is in an error state, so shut it down.
return True
if missing is None and self._cloud.broken(self.cloud_node):
- self._logger.warning(
- "cloud reports broken node, but paired node %s never pinged "
- "(bug?) -- skipped check for node_stale_after",
+ self._logger.info(
+ "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+ "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
self.arvados_node['uuid'])
- return self.in_state('idle')
+ if self.in_state('idle'):
+ return True
+ else:
+ return "node is not idle."
def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self.shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- elif self._shutdowns.window_open():
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- elif self.last_shutdown_opening != next_opening:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
+ try:
+ next_opening = self._shutdowns.next_opening()
+ eligible = self.shutdown_eligible()
+ if eligible is True:
+ self._debug("Suggesting shutdown.")
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Cannot shut down because %s", eligible)
+ elif self.last_shutdown_opening != next_opening:
+ self._debug("Shutdown window closed. Next at %s.",
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+ except Exception:
+ self._logger.exception("Unexpected exception")
def offer_arvados_pair(self, arvados_node):
first_ping_s = arvados_node.get('first_ping_at')
self._nodename = None
return super(ComputeNodeShutdownActor, self).on_start()
else:
+ self._set_logger()
self._nodename = arv_node['hostname']
self._logger.info("Draining SLURM node %s", self._nodename)
self._later.issue_slurm_drain()
def _create_driver(self, driver_class, **auth_kwargs):
return driver_class(**auth_kwargs)
+ @RetryMixin._retry()
+ def _set_sizes(self):
+ self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
driver_class, retry_wait=1, max_retry_wait=180):
"""Base initializer for compute node drivers.
"""
super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
- logging.getLogger(str(type(self))),
+ logging.getLogger(self.__class__.__name__),
type(self),
None)
self.real = self._create_driver(driver_class, **auth_kwargs)
if new_pair is not None:
self.create_kwargs[new_pair[0]] = new_pair[1]
- self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+ self._set_sizes()
def _init_ping_host(self, ping_host):
self.ping_host = ping_host
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._logger = logging.getLogger('arvnodeman.daemon')
self._later = self.actor_ref.proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
- self._logger.debug("Daemon initialized")
+ self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+ self._logger.debug("Daemon started")
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
def _pair_nodes(self, node_record, arvados_node):
- self._logger.info("Cloud node %s has associated with Arvados node %s",
- node_record.cloud_node.id, arvados_node['uuid'])
+ self._logger.info("Cloud node %s is now paired with Arvados node %s",
+ node_record.cloud_node.name, arvados_node['uuid'])
self._arvados_nodes_actor.subscribe_to(
arvados_node['uuid'], node_record.actor.update_arvados_node)
node_record.arvados_node = arvados_node
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
+ del self.sizes_booting_shutdown[key]
record.actor.stop()
record.cloud_node = None
self._pair_nodes(cloud_rec, arv_node)
break
- def _nodes_up(self, size):
- up = 0
- up += sum(1
- for c in self.booting.itervalues()
- if size is None or c.cloud_size.get().id == size.id)
- up += sum(1
- for i in (self.booted, self.cloud_nodes.nodes)
- for c in i.itervalues()
+ def _nodes_booting(self, size):
+ s = sum(1
+ for c in self.booting.iterkeys()
+ if size is None or self.sizes_booting_shutdown[c].id == size.id)
+ s += sum(1
+ for c in self.booted.itervalues()
+ if size is None or c.cloud_node.size.id == size.id)
+ return s
+
+ def _nodes_unpaired(self, size):
+ return sum(1
+ for c in self.cloud_nodes.unpaired()
+ if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_booted(self, size):
+ return sum(1
+ for c in self.cloud_nodes.nodes.itervalues()
if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_up(self, size):
+ up = self._nodes_booting(size) + self._nodes_booted(size)
return up
def _total_price(self):
cost = 0
- cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
- for c in self.booting.itervalues())
+ cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+ for c in self.booting.iterkeys())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
for i in (self.booted, self.cloud_nodes.nodes)
for c in i.itervalues())
def _size_shutdowns(self, size):
sh = 0
- for c in self.shutdowns.itervalues():
+ for c in self.shutdowns.iterkeys():
try:
- if c.cloud_node.get().size.id == size.id:
+ if self.sizes_booting_shutdown[c].id == size.id:
sh += 1
except pykka.ActorDeadError:
pass
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
- self._nodes_busy(size) +
- self._nodes_missing(size))
+ booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
+ shutdown_count = self._size_shutdowns(size)
+ busy_count = self._nodes_busy(size)
+ up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
- self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
+ self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+ self._size_wishlist(size),
+ up_count + busy_count,
+ booting_count,
+ up_count - booting_count,
+ busy_count,
+ shutdown_count)
wanted = self._size_wishlist(size) - up_count
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
self._update_poll_time('server_wishlist')
self.last_wishlist = wishlist
for size in reversed(self.server_calculator.cloud_sizes):
- nodes_wanted = self._nodes_wanted(size)
- if nodes_wanted > 0:
- self._later.start_node(size)
- elif (nodes_wanted < 0) and self.booting:
- self._later.stop_booting_node(size)
+ try:
+ nodes_wanted = self._nodes_wanted(size)
+ if nodes_wanted > 0:
+ self._later.start_node(size)
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node(size)
+ except Exception as e:
+ self._logger.exception("while calculating nodes wanted for size %s", size)
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- self._logger.info("Want %s more nodes. Booting a %s node.",
+ self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
timer_actor=self._timer,
cloud_client=self._new_cloud(),
cloud_size=cloud_size).proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
def node_up(self, setup_proxy):
cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
+ del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
setup_proxy.stop()
- record = self.cloud_nodes.get(cloud_node.id)
- if record is None:
- record = self._new_node(cloud_node)
- self.booted[cloud_node.id] = record
- self._timer.schedule(time.time() + self.boot_fail_after,
- self._later.shutdown_unpaired_node, cloud_node.id)
+ if cloud_node is not None:
+ record = self.cloud_nodes.get(cloud_node.id)
+ if record is None:
+ record = self._new_node(cloud_node)
+ self.booted[cloud_node.id] = record
+ self._timer.schedule(time.time() + self.boot_fail_after,
+ self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
def stop_booting_node(self, size):
for key, node in self.booting.iteritems():
if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
+ del self.sizes_booting_shutdown[key]
+
if nodes_excess > 1:
self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
- cloud_node_id = node_actor.cloud_node.get().id
+ cloud_node_obj = node_actor.cloud_node.get()
+ cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
+ self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
shutdown.subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
"""
CLIENT_ERRORS = ARVADOS_ERRORS
- LOGGER_NAME = 'arvnodeman.jobqueue'
def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
super(JobQueueMonitorActor, self).__init__(
def _got_response(self, queue):
server_list = self._calculator.servers_for_queue(queue)
- self._logger.debug("Sending server wishlist: %s",
+ self._logger.debug("Calculated wishlist: %s",
', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
cloud_node_poller = CloudNodeListMonitorActor.start(
- config.new_cloud_client(timer), timer, poll_time, max_poll_time).proxy()
+ config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
arvados_node_poller = ArvadosNodeListMonitorActor.start(
config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
job_queue_poller = JobQueueMonitorActor.start(
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
- setup_logging(config.get('Logging', 'file'), **config.log_levels())
- node_setup, node_shutdown, node_update, node_monitor = \
- config.dispatch_classes()
- server_calculator = build_server_calculator(config)
- timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
- launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
- node_daemon = NodeManagerDaemonActor.start(
- job_queue_poller, arvados_node_poller, cloud_node_poller,
- cloud_node_updater, timer,
- config.new_arvados_client, config.new_cloud_client,
- config.shutdown_windows(),
- server_calculator,
- config.getint('Daemon', 'min_nodes'),
- config.getint('Daemon', 'max_nodes'),
- config.getint('Daemon', 'poll_stale_after'),
- config.getint('Daemon', 'boot_fail_after'),
- config.getint('Daemon', 'node_stale_after'),
- node_setup, node_shutdown, node_monitor,
- max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
-
- signal.pause()
- daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
- while not daemon_stopped():
- time.sleep(1)
- pykka.ActorRegistry.stop_all()
+ try:
+ setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ node_setup, node_shutdown, node_update, node_monitor = \
+ config.dispatch_classes()
+ server_calculator = build_server_calculator(config)
+ timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+ launch_pollers(config, server_calculator)
+ cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+ node_daemon = NodeManagerDaemonActor.start(
+ job_queue_poller, arvados_node_poller, cloud_node_poller,
+ cloud_node_updater, timer,
+ config.new_arvados_client, config.new_cloud_client,
+ config.shutdown_windows(),
+ server_calculator,
+ config.getint('Daemon', 'min_nodes'),
+ config.getint('Daemon', 'max_nodes'),
+ config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'boot_fail_after'),
+ config.getint('Daemon', 'node_stale_after'),
+ node_setup, node_shutdown, node_monitor,
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+
+ signal.pause()
+ daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+ while not daemon_stopped():
+ time.sleep(1)
+ except Exception:
+ logging.exception("Uncaught exception during setup")
+ finally:
+ pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
This actor regularly polls the list of Arvados node records, and
sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.arvados_nodes'
def is_common_error(self, exception):
return isinstance(exception, config.ARVADOS_ERRORS)
This actor regularly polls the cloud to get a list of running compute
nodes, and sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.cloud_nodes'
def is_common_error(self, exception):
return self._client.is_cloud_exception(exception)
return node.id
def _send_request(self):
- return self._client.list_nodes()
+ n = self._client.list_nodes()
+ return n
def test_late_subscribers_get_responses(self):
self.build_monitor(['pre_late_test', 'late_test'])
- self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
+ mock_subscriber = mock.Mock(name='mock_subscriber')
+ self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
self.monitor.subscribe(self.subscriber)
self.monitor.poll().get(self.TIMEOUT)
self.stop_proxy(self.monitor)
if __name__ == '__main__':
unittest.main()
-
import httplib2
import mock
import pykka
+import threading
import arvnodeman.computenode.dispatch as dispatch
from . import testutil
def test_creation_without_arvados_node(self):
self.make_actor()
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
def test_creation_with_arvados_node(self):
self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
self.make_actor(testutil.arvados_node_mock())
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
def test_no_shutdown_booting(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
last_ping_at='1970-01-01T01:02:03.04050607Z')
self.make_actor(10, arv_node)
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_running_broken(self):
arv_node = testutil.arvados_node_mock(12, job_uuid=None,
self.make_actor(12, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_shutdown_missing_broken(self):
arv_node = testutil.arvados_node_mock(11, job_uuid=None,
def test_no_shutdown_when_window_closed(self):
self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(
5, crunch_worker_state=None))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_arvados_node_match(self):
self.make_actor(2)
mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+ self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
import cgi
import json
+import math
import pkg_resources
from crunchstat_summary import logger
}
for s in self.summarizers]
+ def _axisY(self, tasks, stat):
+ ymax = 1
+ for task in tasks.itervalues():
+ for pt in task.series[stat]:
+ ymax = max(ymax, pt[1])
+ ytick = math.exp((1+math.floor(math.log(ymax, 2)))*math.log(2))/4
+ return {
+ 'gridColor': '#cccccc',
+ 'gridThickness': 1,
+ 'interval': ytick,
+ 'minimum': 0,
+ 'maximum': ymax,
+ 'valueFormatString': "''",
+ }
+
def charts(self, label, tasks):
return [
{
- 'axisY': {
- 'minimum': 0,
- },
+ 'axisY': self._axisY(tasks=tasks, stat=stat),
'data': [
{
'type': 'line',
src = self.add_mutually_exclusive_group()
src.add_argument(
'--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep')
+ help='Look up the specified job and read its log data from Keep'
+ ' (or from the Arvados event log, if the job is still running)')
src.add_argument(
'--pipeline-instance', type=str, metavar='UUID',
help='Summarize each component of the given pipeline instance')
--- /dev/null
+from __future__ import print_function
+
+import arvados
+import Queue
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+ self._label = "{}/{}".format(collection_id, filenames[0])
+
+ def __str__(self):
+ return self._label
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ EOF = None
+
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self.job_uuid = job_uuid
+
+ def __str__(self):
+ return self.job_uuid
+
+ def _get_all_pages(self):
+ got = 0
+ last_id = 0
+ filters = [
+ ['object_uuid', '=', self.job_uuid],
+ ['event_type', '=', 'stderr']]
+ try:
+ while True:
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=filters + [['id','>',str(last_id)]],
+ select=['id', 'properties'],
+ ).execute(num_retries=2)
+ got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self.job_uuid, got,
+ got + page['items_available'] - len(page['items']))
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._queue.put(line+'\n')
+ last_id = i['id']
+ if (len(page['items']) == 0 or
+ len(page['items']) >= page['items_available']):
+ break
+ finally:
+ self._queue.put(self.EOF)
+
+ def __iter__(self):
+ self._queue = Queue.Queue()
+ self._thread = threading.Thread(target=self._get_all_pages)
+ self._thread.daemon = True
+ self._thread.start()
+ return self
+
+ def next(self):
+ line = self._queue.get()
+ if line is self.EOF:
+ self._thread.join()
+ raise StopIteration
+ return line
import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
import math
import re
import sys
+import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
- functools.partial(collections.defaultdict,
- lambda: float('-Inf')))
+ functools.partial(collections.defaultdict, lambda: 0))
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
# constructor will overwrite this with something useful.
self.existing_constraints = {}
- logger.debug("%s: logdata %s", self.label, repr(logdata))
+ logger.debug("%s: logdata %s", self.label, logdata)
def run(self):
- logger.debug("%s: parsing log data", self.label)
+ logger.debug("%s: parsing logdata %s", self.label, self._logdata)
for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
if m:
logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
if m:
task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
child_summarizer.stats_max = self.stats_max
child_summarizer.task_stats = self.task_stats
child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
child_summarizer.run()
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
val = val / this_interval_s
if stat in ['user+sys__rate', 'tx+rx__rate']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
else:
if stat in ['rss']:
task.series[category, stat].append(
- (timestamp - task.starttime, val))
+ (timestamp - self.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
return label
def text_report(self):
+ if not self.tasks:
+ return "(no report generated)\n"
return "\n".join(itertools.chain(
self._text_report_gen(),
self._recommend_gen())) + "\n"
lambda x: x * 100),
('Overall CPU usage: {}%',
self.job_tot['cpu']['user+sys'] /
- self.job_tot['time']['elapsed'],
+ self.job_tot['time']['elapsed']
+ if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
lambda x: x / 1e9),
('Max network traffic in a single task: {}GB',
- self.stats_max['net:eth0']['tx+rx'],
+ self.stats_max['net:eth0']['tx+rx'] +
+ self.stats_max['net:keep0']['tx+rx'],
lambda x: x / 1e9),
('Max network speed in a single interval: {}MB/s',
- self.stats_max['net:eth0']['tx+rx__rate'],
- lambda x: x / 1e6)):
+ self.stats_max['net:eth0']['tx+rx__rate'] +
+ self.stats_max['net:keep0']['tx+rx__rate'],
+ lambda x: x / 1e6),
+ ('Keep cache miss rate {}%',
+ (float(self.job_tot['keepcache']['miss']) /
+ float(self.job_tot['keepcalls']['get']))
+ if self.job_tot['keepcalls']['get'] > 0 else 0,
+ lambda x: x * 100.0),
+ ('Keep cache utilization {}%',
+ (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ if self.job_tot['net:keep0']['rx'] > 0 else 0,
+ lambda x: x * 100.0)):
format_string, val, transform = args
if val == float('-Inf'):
continue
def _recommend_gen(self):
return itertools.chain(
self._recommend_cpu(),
- self._recommend_ram())
+ self._recommend_ram(),
+ self._recommend_keep_cache())
def _recommend_cpu(self):
"""Recommend asking for 4 cores if max CPU usage was 333%"""
if cpu_max_rate == float('-Inf'):
logger.warning('%s: no CPU usage data', self.label)
return
- used_cores = int(math.ceil(cpu_max_rate))
+ used_cores = max(1, int(math.ceil(cpu_max_rate)))
asked_cores = self.existing_constraints.get('min_cores_per_node')
if asked_cores is None or used_cores < asked_cores:
yield (
int(used_mib),
int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+ def _recommend_keep_cache(self):
+ """Recommend increasing keep cache if utilization < 80%"""
+ if self.job_tot['net:keep0']['rx'] == 0:
+ return
+ utilization = (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+
+ if utilization < 0.8:
+ yield (
+ '#!! {} Keep cache utilization was {:.2f}% -- '
+ 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+ ).format(
+ self.label,
+ utilization * 100.0,
+ asked_mib*2)
+
+
def _format(self, val):
"""Return a string representation of a stat.
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
+ rdr = None
+ if self.job.get('log'):
+ try:
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ except arvados.errors.NotFoundError as e:
+ logger.warning("Trying event logs after failing to read "
+ "log collection %s: %s", self.job['log'], e)
+ else:
+ label = self.job['uuid']
+ if rdr is None:
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
- elif component['job'].get('log') is None:
- logger.warning(
- "%s: skipping job %s with no log available",
- cname, component['job'].get('uuid'))
else:
logger.info(
- "%s: logdata %s", cname, component['job']['log'])
+ "%s: job %s", cname, component['job']['uuid'])
summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = cname
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
self.summarizers[cname] = summarizer
self.label = pipeline_instance_uuid
def run(self):
+ threads = []
for summarizer in self.summarizers.itervalues():
- summarizer.run()
+ t = threading.Thread(target=summarizer.run)
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
def text_report(self):
txt = ''
category metric task_max task_max_rate job_total
-blkio:0:0 read 0 0.00 0
-blkio:0:0 write 0 0.00 0
+blkio:0:0 read 0 0 0
+blkio:0:0 write 0 0 0
cpu cpus 8 - -
cpu sys 1.92 0.04 1.92
cpu user 3.83 0.09 3.83
cpu user+sys 5.75 0.13 5.75
-fuseops read 0 0.00 0
-fuseops write 0 0.00 0
-keepcache hit 0 0.00 0
-keepcache miss 0 0.00 0
-keepcalls get 0 0.00 0
-keepcalls put 0 0.00 0
+fuseops read 0 0 0
+fuseops write 0 0 0
+keepcache hit 0 0 0
+keepcache miss 0 0 0
+keepcalls get 0 0 0
+keepcalls put 0 0 0
mem cache 1678139392 - -
mem pgmajfault 0 - 0
mem rss 349814784 - -
net:eth0 rx 1754364530 41658344.87 1754364530
net:eth0 tx 38837956 920817.97 38837956
net:eth0 tx+rx 1793202486 42579162.83 1793202486
-net:keep0 rx 0 0.00 0
-net:keep0 tx 0 0.00 0
-net:keep0 tx+rx 0 0.00 0
+net:keep0 rx 0 0 0
+net:keep0 tx 0 0 0
+net:keep0 tx+rx 0 0 0
time elapsed 80 - 80
# Number of tasks: 1
# Max CPU time spent by a single task: 5.75s
# Max memory used by a single task: 0.35GB
# Max network traffic in a single task: 1.79GB
# Max network speed in a single interval: 42.58MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
category metric task_max task_max_rate job_total
cpu cpus 8 - -
-cpu sys 0.00 - 0.00
-cpu user 0.00 - 0.00
-cpu user+sys 0.00 - 0.00
+cpu sys 0 - 0.00
+cpu user 0 - 0.00
+cpu user+sys 0 - 0.00
mem cache 12288 - -
mem pgmajfault 0 - 0
mem rss 856064 - -
net:eth0 tx+rx 180 - 180
time elapsed 2 - 4
# Number of tasks: 2
-# Max CPU time spent by a single task: 0.00s
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
category metric task_max task_max_rate job_total
cpu cpus 8 - -
-cpu sys 0.00 - 0.00
-cpu user 0.00 - 0.00
-cpu user+sys 0.00 - 0.00
+cpu sys 0 - 0.00
+cpu user 0 - 0.00
+cpu user+sys 0 - 0.00
mem cache 8192 - -
mem pgmajfault 0 - 0
mem rss 450560 - -
net:eth0 tx+rx 180 - 180
time elapsed 2 - 3
# Number of tasks: 2
-# Max CPU time spent by a single task: 0.00s
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
job_report + ['\n'] +
['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
job_report + ['\n'] +
+ ['### Summary for unfinished-job (zzzzz-8i9sb-xxxxxxxxxxxxxxx)\n',
+ '(no report generated)\n',
+ '\n'] +
['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
job_report)
self.diff_report(cmd, expect)