# Dev/test SSL certificates
/self-signed.key
/self-signed.pem
+
+# Generated git-commit.version file
+/git-commit.version
therubyracer
uglifier (>= 1.0.3)
wiselinks
-
-BUNDLED WITH
- 1.10.6
end
end
+ # star / unstar the current project
+ def star
+ links = Link.where(tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star')
+
+ if params['status'] == 'create'
+ # create 'star' link if one does not already exist
+ if !links.andand.any?
+ dst = Link.new(owner_uuid: current_user.uuid,
+ tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star',
+ name: @object.uuid)
+ dst.save!
+ end
+ else # delete any existing 'star' links
+ if links.andand.any?
+ links.each do |link|
+ link.destroy
+ end
+ end
+ end
+
+ respond_to do |format|
+ format.js
+ end
+ end
+
protected
def derive_unique_filename filename, manifest_files
# exception here than in a template.)
unless current_user.nil?
begin
- build_project_trees
+ my_starred_projects current_user
+ build_my_wanted_projects_tree current_user
rescue ArvadosApiClient::ApiError
# Fall back to the default-setting code later.
end
end
- @my_project_tree ||= []
- @shared_project_tree ||= []
+ @starred_projects ||= []
+ @my_wanted_projects_tree ||= []
render_error(err_opts)
end
end
end
+ helper_method :is_starred
+ def is_starred
+ links = Link.where(tail_uuid: current_user.uuid,
+ head_uuid: @object.uuid,
+ link_class: 'star')
+
+ return links.andand.any?
+ end
+
protected
helper_method :strip_token_from_path
{collections: c, owners: own}
end
- helper_method :my_project_tree
- def my_project_tree
- build_project_trees
- @my_project_tree
+ helper_method :my_starred_projects
+ def my_starred_projects user
+ return if @starred_projects
+ links = Link.filter([['tail_uuid', '=', user.uuid],
+ ['link_class', '=', 'star'],
+ ['head_uuid', 'is_a', 'arvados#group']]).select(%w(head_uuid))
+ uuids = links.collect { |x| x.head_uuid }
+ starred_projects = Group.filter([['uuid', 'in', uuids]]).order('name')
+ @starred_projects = starred_projects.results
+ end
+
+ # If there are more than 200 projects that are readable by the user,
+ # build the tree using only the top 200+ projects owned by the user,
+ # from the top three levels.
+ # That is: get toplevel projects under home, get subprojects of
+ # these projects, and so on until we hit the limit.
+ def my_wanted_projects user, page_size=100
+ return @my_wanted_projects if @my_wanted_projects
+
+ from_top = []
+ uuids = [user.uuid]
+ depth = 0
+ @too_many_projects = false
+ @reached_level_limit = false
+ while from_top.size <= page_size*2
+ current_level = Group.filter([['group_class','=','project'],
+ ['owner_uuid', 'in', uuids]])
+ .order('name').limit(page_size*2)
+ break if current_level.results.size == 0
+ @too_many_projects = true if current_level.items_available > current_level.results.size
+ from_top.concat current_level.results
+ uuids = current_level.results.collect { |x| x.uuid }
+ depth += 1
+ if depth >= 3
+ @reached_level_limit = true
+ break
+ end
+ end
+ @my_wanted_projects = from_top
end
- helper_method :shared_project_tree
- def shared_project_tree
- build_project_trees
- @shared_project_tree
+ helper_method :my_wanted_projects_tree
+ def my_wanted_projects_tree user, page_size=100
+ build_my_wanted_projects_tree user, page_size
+ [@my_wanted_projects_tree, @too_many_projects, @reached_level_limit]
end
- def build_project_trees
- return if @my_project_tree and @shared_project_tree
- parent_of = {current_user.uuid => 'me'}
- all_projects.each do |ob|
+ def build_my_wanted_projects_tree user, page_size=100
+ return @my_wanted_projects_tree if @my_wanted_projects_tree
+
+ parent_of = {user.uuid => 'me'}
+ my_wanted_projects(user, page_size).each do |ob|
parent_of[ob.uuid] = ob.owner_uuid
end
- children_of = {false => [], 'me' => [current_user]}
- all_projects.each do |ob|
- if ob.owner_uuid != current_user.uuid and
+ children_of = {false => [], 'me' => [user]}
+ my_wanted_projects(user, page_size).each do |ob|
+ if ob.owner_uuid != user.uuid and
not parent_of.has_key? ob.owner_uuid
parent_of[ob.uuid] = false
end
end
paths
end
- @my_project_tree =
+ @my_wanted_projects_tree =
sorted_paths.call buildtree.call(children_of, 'me')
- @shared_project_tree =
- sorted_paths.call({'Projects shared with me' =>
- buildtree.call(children_of, false)})
end
helper_method :get_object
@api_client.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
else
# Use system CA certificates
- @api_client.ssl_config.add_trust_ca('/etc/ssl/certs')
+ ["/etc/ssl/certs/ca-certificates.crt",
+ "/etc/pki/tls/certs/ca-bundle.crt"]
+ .select { |ca_path| File.readable?(ca_path) }
+ .each { |ca_path| @api_client.ssl_config.add_trust_ca(ca_path) }
end
if Rails.configuration.api_response_compression
@api_client.transparent_gzip_decompression = true
+ <nav class="navbar navbar-default breadcrumbs" role="navigation">
+ <ul class="nav navbar-nav navbar-left">
+ <li>
+ <a href="/">
+ <i class="fa fa-lg fa-fw fa-dashboard"></i>
+ Dashboard
+ </a>
+ </li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
+ Projects
+ <span class="caret"></span>
+ </a>
+ <ul class="dropdown-menu" style="min-width: 20em" role="menu">
+ <li role="menuitem">
+ <%= link_to(
+ url_for(
+ action: 'choose',
+ controller: 'search',
+ filters: [['uuid', 'is_a', 'arvados#group']].to_json,
+ title: 'Search',
+ action_name: 'Show',
+ action_href: url_for(controller: :actions, action: :show),
+ action_method: 'get',
+ action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
+ { remote: true, method: 'get', title: "Search" }) do %>
+ <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
+ <% end %>
+ </li>
+ <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
+ <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
+ </li>
+ <% end %>
+ <li role="menuitem">
+ <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
+ <i class="fa fa-fw fa-plus"></i> Add a new project
+ <% end %>
+ </li>
+ <li role="presentation" class="divider"></li>
+ <%= render partial: "projects_tree_menu", locals: {
+ :project_link_to => Proc.new do |pnode, &block|
+ link_to(project_path(pnode[:object].uuid),
+ data: { 'object-uuid' => pnode[:object].uuid,
+ 'name' => 'name' },
+ &block)
+ end,
+ } %>
+ </ul>
+ </li>
+ <% if @name_link or @object %>
+ <li class="nav-separator">
+ <i class="fa fa-lg fa-angle-double-right"></i>
+ </li>
+ <li>
+ <%= link_to project_path(current_user.uuid) do %>
+ Home
+ <% end %>
+ </li>
+ <% project_breadcrumbs.each do |p| %>
+ <li class="nav-separator">
+ <i class="fa fa-lg fa-angle-double-right"></i>
+ </li>
+ <li>
+ <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
+ </li>
+ <% end %>
+ <% end %>
+ </ul>
+ </nav>
+<% starred_projects = my_starred_projects current_user%>
+<% if starred_projects.andand.any? %>
+ <li role="presentation" class="dropdown-header">
+ My favorite projects
+ </li>
+ <li>
+ <%= project_link_to.call({object: current_user, depth: 0}) do %>
+ <span style="padding-left: 0">Home</span>
+ <% end %>
+ </li>
+ <% (starred_projects).each do |pnode| %>
+ <li>
+ <%= project_link_to.call({object: pnode, depth: 0}) do%>
+ <span style="padding-left: 0em"></span><%= pnode[:name] %>
+ <% end %>
+ </li>
+ <% end %>
+ <li role="presentation" class="divider"></li>
+<% end %>
+
<li role="presentation" class="dropdown-header">
My projects
</li>
<span style="padding-left: 0">Home</span>
<% end %>
</li>
-<% my_project_tree.each do |pnode| %>
+<% my_tree = my_wanted_projects_tree current_user %>
+<% my_tree[0].each do |pnode| %>
<% next if pnode[:object].class != Group %>
<li>
<%= project_link_to.call pnode do %>
<% end %>
</li>
<% end %>
+<% if my_tree[1] or my_tree[0].size > 200 %>
+<li role="presentation" class="dropdown-header">
+ Some projects have been omitted.
+</li>
+<% elsif my_tree[2] %>
+<li role="presentation" class="dropdown-header">
+ Showing top three levels of your projects.
+</li>
+<% end %>
--- /dev/null
+<% if current_user and is_starred %>
+ <%= link_to(star_path(status: 'delete', id:@object.uuid, action_method: 'get'), style: "color:#D00", class: "btn btn-xs star-unstar", title: "Remove from list of favorites", remote: true) do %>
+ <i class="fa fa-lg fa-star"></i>
+ <% end %>
+<% else %>
+ <%= link_to(star_path(status: 'create', id:@object.uuid, action_method: 'get'), class: "btn btn-xs star-unstar", title: "Add to list of favorites", remote: true) do %>
+ <i class="fa fa-lg fa-star-o"></i>
+ <% end %>
+<% end %>
--- /dev/null
+$(".star-unstar").html("<%= escape_javascript(render partial: 'show_star') %>");
+$(".breadcrumbs").html("<%= escape_javascript(render partial: 'breadcrumbs') %>");
</nav>
<% if current_user.andand.is_active %>
- <nav class="navbar navbar-default breadcrumbs" role="navigation">
- <ul class="nav navbar-nav navbar-left">
- <li>
- <a href="/">
- <i class="fa fa-lg fa-fw fa-dashboard"></i>
- Dashboard
- </a>
- </li>
- <li class="dropdown">
- <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
- Projects
- <span class="caret"></span>
- </a>
- <ul class="dropdown-menu" style="min-width: 20em" role="menu">
- <li role="menuitem">
- <%= link_to(
- url_for(
- action: 'choose',
- controller: 'search',
- filters: [['uuid', 'is_a', 'arvados#group']].to_json,
- title: 'Search',
- action_name: 'Show',
- action_href: url_for(controller: :actions, action: :show),
- action_method: 'get',
- action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
- { remote: true, method: 'get', title: "Search" }) do %>
- <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
- <% end %>
- </li>
- <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
- <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
- </li>
- <% end %>
- <li role="menuitem">
- <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
- <i class="fa fa-fw fa-plus"></i> Add a new project
- <% end %>
- </li>
- <li role="presentation" class="divider"></li>
- <%= render partial: "projects_tree_menu", locals: {
- :project_link_to => Proc.new do |pnode, &block|
- link_to(project_path(pnode[:object].uuid),
- data: { 'object-uuid' => pnode[:object].uuid,
- 'name' => 'name' },
- &block)
- end,
- } %>
- </ul>
- </li>
- <% if @name_link or @object %>
- <li class="nav-separator">
- <i class="fa fa-lg fa-angle-double-right"></i>
- </li>
- <li>
- <%= link_to project_path(current_user.uuid) do %>
- Home
- <% end %>
- </li>
- <% project_breadcrumbs.each do |p| %>
- <li class="nav-separator">
- <i class="fa fa-lg fa-angle-double-right"></i>
- </li>
- <li>
- <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
- </li>
- <% end %>
- <% end %>
- </ul>
- </nav>
+ <%= render partial: 'breadcrumbs' %>
<% elsif !current_user %> <%# anonymous %>
<% if (@name_link or @object) and (project_breadcrumbs.any?) %>
<nav class="navbar navbar-default breadcrumbs" role="navigation">
<div class="modal-body">
<div class="selectable-container" style="height: 15em; overflow-y: scroll">
- <% [my_project_tree, shared_project_tree].each do |tree| %>
- <% tree.each do |projectnode| %>
- <% if projectnode[:object].is_a? String %>
- <div class="row" style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px">
- <i class="fa fa-fw fa-share-alt"></i>
- <%= projectnode[:object] %>
- </div>
- <% else
- row_selectable = !params[:editable] || projectnode[:object].editable?
- if projectnode[:object].uuid == current_user.uuid
- row_name = "Home"
- row_selectable = true
- else
- row_name = projectnode[:object].friendly_link_name || 'New project'
- end %>
- <div class="<%= 'selectable project' if row_selectable %> row"
- style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px" data-object-uuid="<%= projectnode[:object].uuid %>">
- <i class="fa fa-fw fa-folder-o"></i> <%= row_name %>
- </div>
- <% end %>
+ <% starred_projects = my_starred_projects current_user%>
+ <% if starred_projects.andand.any? %>
+ <% writable_projects = starred_projects.select(&:editable?) %>
+ <% writable_projects.each do |projectnode| %>
+ <% row_name = projectnode.friendly_link_name || 'New project' %>
+ <div class="selectable project row"
+ style="padding-left: 1em; margin-right: 0px"
+ data-object-uuid="<%= projectnode.uuid %>">
+ <i class="fa fa-fw fa-folder-o"></i> <%= row_name %> <i class="fa fa-fw fa-star"></i>
+ </div>
<% end %>
<% end %>
+
+ <% my_projects = my_wanted_projects_tree(current_user) %>
+ <% my_projects[0].each do |projectnode| %>
+ <% if projectnode[:object].uuid == current_user.uuid
+ row_name = "Home"
+ else
+ row_name = projectnode[:object].friendly_link_name || 'New project'
+ end %>
+ <div class="selectable project row"
+ style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px"
+ data-object-uuid="<%= projectnode[:object].uuid %>">
+ <i class="fa fa-fw fa-folder-o"></i> <%= row_name %>
+ </div>
+ <% end %>
</div>
+
+ <% if my_projects[1] or my_projects[2] or my_projects[0].size > 200 %>
+ <div>Some of your projects are omitted. Add projects of interest to favorites.</div>
+ <% end %>
</div>
<div class="modal-footer">
<% if @object.uuid == current_user.andand.uuid %>
Home
<% else %>
+ <%= render partial: "show_star" %>
<%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New project" } %>
<% end %>
</h2>
get "users/setup" => 'users#setup', :as => :setup_user
get "report_issue_popup" => 'actions#report_issue_popup', :as => :report_issue_popup
post "report_issue" => 'actions#report_issue', :as => :report_issue
+ get "star" => 'actions#star', :as => :star
resources :nodes
resources :humans
resources :traits
}, session_for(:active)
assert_select "#projects-menu + ul li.divider ~ li a[href=/projects/#{project_uuid}]"
end
+
+ [
+ ["active", 5, ["aproject", "asubproject"], "anonymously_accessible_project"],
+ ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_jobs"],
+ ["admin", 5, ["anonymously_accessible_project", "subproject_in_anonymous_accessible_project"], "aproject"],
+ ].each do |user, page_size, tree_segment, unexpected|
+ test "build my projects tree for #{user} user and verify #{unexpected} is omitted" do
+ use_token user
+ ctrl = ProjectsController.new
+
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+
+ my_tree = ctrl.send :my_wanted_projects_tree, current_user, page_size
+
+ tree_segment_at_depth_1 = api_fixture('groups')[tree_segment[0]]
+ tree_segment_at_depth_2 = api_fixture('groups')[tree_segment[1]] if tree_segment[1]
+
+ tree_nodes = {}
+ my_tree[0].each do |x|
+ tree_nodes[x[:object]['uuid']] = x[:depth]
+ end
+
+ assert_equal(1, tree_nodes[tree_segment_at_depth_1['uuid']])
+ assert_equal(2, tree_nodes[tree_segment_at_depth_2['uuid']]) if tree_segment[1]
+
+ unexpected_project = api_fixture('groups')[unexpected]
+ assert_nil(tree_nodes[unexpected_project['uuid']])
+ end
+ end
+
+ [
+ ["active", 1],
+ ["project_viewer", 1],
+ ["admin", 0],
+ ].each do |user, size|
+ test "starred projects for #{user}" do
+ use_token user
+ ctrl = ProjectsController.new
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(size, my_starred_project.andand.size)
+
+ ctrl2 = ProjectsController.new
+ current_user = User.find(api_fixture('users')[user]['uuid'])
+ my_starred_project = ctrl2.send :my_starred_projects, current_user
+ assert_equal(size, my_starred_project.andand.size)
+ end
+ end
+
+ test "unshare project and verify that it is no longer included in shared user's starred projects" do
+ # remove sharing link
+ use_token :system_user
+ Link.find(api_fixture('links')['share_starred_project_with_project_viewer']['uuid']).destroy
+
+ # verify that project is no longer included in starred projects
+ use_token :project_viewer
+ current_user = User.find(api_fixture('users')['project_viewer']['uuid'])
+ ctrl = ProjectsController.new
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(0, my_starred_project.andand.size)
+
+ # share it again
+ @controller = LinksController.new
+ post :create, {
+ link: {
+ link_class: 'permission',
+ name: 'can_read',
+ head_uuid: api_fixture('groups')['starred_and_shared_active_user_project']['uuid'],
+ tail_uuid: api_fixture('users')['project_viewer']['uuid'],
+ },
+ format: :json
+ }, session_for(:system_user)
+
+ # verify that the project is again included in starred projects
+ use_token :project_viewer
+ ctrl = ProjectsController.new
+ my_starred_project = ctrl.send :my_starred_projects, current_user
+ assert_equal(1, my_starred_project.andand.size)
+ end
end
test 'Create a project and move it into a different project' do
visit page_with_token 'active', '/projects'
find("#projects-menu").click
- find(".dropdown-menu a", text: "Home").click
+ within('.dropdown-menu') do
+ first('li', text: 'Home').click
+ end
+ wait_for_ajax
find('.btn', text: "Add a subproject").click
within('h2') do
visit '/projects'
find("#projects-menu").click
- find(".dropdown-menu a", text: "Home").click
+ within('.dropdown-menu') do
+ first('li', text: 'Home').click
+ end
+ wait_for_ajax
find('.btn', text: "Add a subproject").click
within('h2') do
find('.fa-pencil').click
assert page.has_text?('Unrestricted public data'), 'No text - Unrestricted public data'
assert page.has_text?('An anonymously accessible project'), 'No text - An anonymously accessible project'
end
+
+ test "test star and unstar project" do
+ visit page_with_token 'active', "/projects/#{api_fixture('groups')['anonymously_accessible_project']['uuid']}"
+
+ # add to favorites
+ find('.fa-star-o').click
+ wait_for_ajax
+
+ find("#projects-menu").click
+ within('.dropdown-menu') do
+ assert_selector 'li', text: 'Unrestricted public data'
+ end
+
+ # remove from favotires
+ find('.fa-star').click
+ wait_for_ajax
+
+ find("#projects-menu").click
+ within('.dropdown-menu') do
+ assert_no_selector 'li', text: 'Unrestricted public data'
+ end
+ end
end
--- /dev/null
+fpm_args+=(-v 2.0)
# FIXME: Remove this line after #6885 is done.
fpm_args+=(--iteration 2)
+
+# FIXME: Remove once support for llfuse 0.42+ is in place
+fpm_args+=(-v 0.41.1)
def sub_glob(v):
l = glob.glob(v)
if len(l) == 0:
- raise SubstitutionError("$(glob {}) no match fonud".format(v))
+ raise SubstitutionError("$(glob {}) no match found".format(v))
else:
return l[0]
active = 1
pids = set([s.pid for s in subprocesses])
while len(pids) > 0:
- (pid, status) = os.wait()
- pids.discard(pid)
- if not taskp.get("task.ignore_rcode"):
- rcode[pid] = (status >> 8)
+ try:
+ (pid, status) = os.wait()
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ raise
else:
- rcode[pid] = 0
+ pids.discard(pid)
+ if not taskp.get("task.ignore_rcode"):
+ rcode[pid] = (status >> 8)
+ else:
+ rcode[pid] = 0
if sig.sig is not None:
logger.critical("terminating on signal %s" % sig.sig)
{% include 'notebox_begin' %}
-Arvados requires git version 1.7.10 or later. If you are using an earlier version of git, please update your git version.
+The Arvados API and Git servers require Git 1.7.10 or later. You can get this version on CentOS 6 from RepoForge. "Install the repository":http://repoforge.org/use/, then run:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install --enablerepo=rpmforge-extras git</span>
+</code></pre>
+</notextile>
+
{% include 'notebox_end' %}
--- /dev/null
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS 6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' authentication only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -ri -e 's/^(host +all +all +(127\.0\.0\.1\/32|::1\/128) +)ident$/\1md5/' {{pg_hba_path}}</span>
+~$ <span class="userinput">sudo service {{pg_service}} restart</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_end' %}
<pre><code><span class="userinput">sudo yum install \
libyaml-devel glibc-headers autoconf gcc-c++ glibc-devel \
patch readline-devel zlib-devel libffi-devel openssl-devel \
- automake libtool bison sqlite-devel
+ automake libtool bison sqlite-devel tar
</span></code></pre></notextile>
Install prerequisites for Ubuntu 12.04 or 14.04:
--- /dev/null
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install runit</span>
+</code></pre>
+</notextile>
Enter it again: <span class="userinput">paste-password-again</span>
</code></pre></notextile>
-{% include 'notebox_begin' %}
-
-This user setup assumes that your PostgreSQL is configured to accept password authentication. Red Hat systems use ident-based authentication by default. You may need to either adapt the user creation, or reconfigure PostgreSQL (in @pg_hba.conf@) to accept password authentication.
-
-{% include 'notebox_end' %}
+{% assign pg_hba_path = "/opt/rh/postgresql92/root/var/lib/pgsql/data/pg_hba.conf" %}
+{% assign pg_service = "postgresql92-postgresql" %}
+{% include 'install_redhat_postgres_auth' %}
Create the database:
</code></pre>
</notextile>
-h2. Set up configuration files
-
-The API server package uses configuration files that you write to @/etc/arvados/api@ and ensures they're consistently deployed. Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/api</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/api</span>
-~$ <span class="userinput">cd /var/www/arvados-api/current</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/database.yml.example /etc/arvados/api/database.yml</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/application.yml.example /etc/arvados/api/application.yml</span>
-</code></pre>
-</notextile>
-
h2. Configure the database connection
Edit @/etc/arvados/api/database.yml@ and replace the @xxxxxxxx@ database password placeholders with the PostgreSQL password you generated above.
h2(#configure_application). Configure the API server
-Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections. The deployment script will consistently deploy this to the API server's configuration directory. The API server reads both @application.yml@ and its own @config/application.default.yml@ file. The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@. The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
+Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections. The API server reads both @application.yml@ and its own @config/application.default.yml@ file. The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@. The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
@config/application.default.yml@ documents additional configuration settings not listed here. You can "view the current source version":https://dev.arvados.org/projects/arvados/repository/revisions/master/entry/services/api/config/application.default.yml for reference.
<ol>
<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
-<li><p>Puma is already included with the API server's gems. We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool. Here's a sample runit script for that:</p>
+<li><p>Install runit to supervise the Puma daemon. {% include 'install_runit' %}<notextile></p></li>
+
+<li><p>Install the script below as the run script for the Puma service, modifying it as directed by the comments.</p>
<pre><code>#!/bin/bash
h2. Install gitolite
-Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.3@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
+Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.4@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
Download and install the version you selected.
<notextile>
<pre><code>git@gitserver:~$ <span class="userinput">echo 'PATH=$HOME/bin:$PATH' >.profile</span>
git@gitserver:~$ <span class="userinput">source .profile</span>
-git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.3</b> git://github.com/sitaramc/gitolite</span>
+git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.4</b> https://github.com/sitaramc/gitolite</span>
...
Note: checking out '5d24ae666bfd2fa9093d67c840eb8d686992083f'.
...
h3. Enable arvados-git-httpd
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the arvados-git-httpd daemon. {% include 'install_runit' %}
Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
<notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
/etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat >log/run' <<'EOF'
exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2>&1
EOF</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
</code></pre>
</notextile>
h2. Copy configuration files from the dispatcher (API server)
-The @/etc/slurm-llnl/slurm.conf@ and @/etc/munge/munge.key@ files need to be identicaly across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
+The @slurm.conf@ and @/etc/munge/munge.key@ files need to be identical across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
h2. Configure FUSE
The arvados-docker-cleaner program removes least recently used docker images as needed to keep disk usage below a configured limit.
{% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with `docker run --rm`. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with `--remove-stopped-containers never`.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
{% include 'notebox_end' %}
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the Docker cleaner daemon. {% include 'install_runit' %}
Configure runit to run the image cleaner using a suitable quota for your compute nodes and workload:
<notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
/etc/sv$ <span class="userinput">sudo mkdir arvados-docker-cleaner; cd arvados-docker-cleaner</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo mkdir log log/main</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo sh -c 'cat >log/run' <<'EOF'
exec python3 -m arvados_docker.cleaner --quota <b>50G</b>
EOF</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
</code></pre>
</notextile>
import arvados, json, socket
fqdn = socket.getfqdn()
hostname, _, domain = fqdn.partition('.')
-ip_address = socket.gethostbyname(fqdn)
-node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain, 'ip_address': ip_address}).execute()
+node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain}).execute()
with open('/root/node.json', 'w') as node_file:
json.dump(node, node_file, indent=2)
EOF
</code>
</pre>
</notextile>
-
</code></pre>
</notextile>
-On Red Hat-based systems, "install SLURM and munge from source following their installation guide":https://computing.llnl.gov/linux/slurm/quickstart_admin.html.
+On Red Hat-based systems:
-Now we need to give SLURM a configuration file in @/etc/slurm-llnl/slurm.conf@. Here's an example:
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install slurm munge slurm-munge</span>
+</code></pre>
+</notextile>
+
+Now we need to give SLURM a configuration file. On Debian-based systems, this is installed at @/etc/slurm-llnl/slurm.conf@. On Red Hat-based systems, this is installed at @/etc/slurm/slurm.conf@. Here's an example @slurm.conf@:
<notextile>
<pre>
# SCHEDULING
SchedulerType=sched/backfill
SchedulerPort=7321
-SelectType=select/cons_res
-SelectTypeParameters=CR_CPU_Memory
-FastSchedule=1
+SelectType=select/linear
+FastSchedule=0
#
# LOGGING
SlurmctldDebug=3
Each hostname in @slurm.conf@ must also resolve correctly on all SLURM worker nodes as well as the controller itself. Furthermore, the hostnames used in the configuration file must match the hostnames reported by @hostname@ or @hostname -s@ on the nodes themselves. This applies to the ControlMachine as well as the worker nodes.
For example:
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
+* In @slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
+* In @slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
* In @/etc/resolv.conf@ on control and worker nodes: @search uuid_prefix.your.domain@
* On the control node: @hostname@ reports @uuid_prefix.your.domain@
* On worker node 123: @hostname@ reports @compute123.uuid_prefix.your.domain@
* @crunch-job@ needs the installation path of the Perl SDK in its @PERLLIB@.
* @crunch-job@ needs the @ARVADOS_API_HOST@ (and, if necessary, @ARVADOS_API_HOST_INSECURE@) environment variable set.
-We recommend you run @crunch-dispatch.rb@ under "runit":http://smarden.org/runit/ or a similar supervisor. Here's an example runit service script:
+Install runit to monitor the Crunch dispatch daemon. {% include 'install_runit' %}
+
+Install the script below as the run script for the Crunch dispatch service, modifying it as directed by the comments.
<notextile>
<pre><code>#!/bin/sh
{% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
-We recommend running Keep-web under "runit":https://packages.debian.org/search?keywords=runit or a similar supervisor. The basic command to start Keep-web is:
+Install runit to supervise the Keep-web daemon. {% include 'install_runit' %}
+
+The basic command to start Keep-web in the service run script is:
<notextile>
<pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
h3. Set up the Keepproxy service
-We recommend you run Keepproxy under "runit":http://smarden.org/runit/ or a similar supervisor. Make sure the launcher sets the envirnoment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@. The core keepproxy command to run is:
+Install runit to supervise the keepproxy daemon. {% include 'install_runit' %}
+
+The run script for the keepproxy service should set the environment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@. The core keepproxy command to run is:
<notextile>
<pre><code>ARVADOS_API_TOKEN=<span class="userinput">{{railsout}}</span> ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> exec keepproxy
server {
listen <span class="userinput">[your public IP address]</span>:443 ssl;
- server_name keep.<span class="userinput">uuid_prefix</span>.your.domain
+ server_name keep.<span class="userinput">uuid_prefix</span>.your.domain;
proxy_connect_timeout 90s;
proxy_read_timeout 300s;
h3. Run keepstore as a supervised service
-We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+Install runit to supervise the keepstore daemon. {% include 'install_runit' %}
+
+Install this script as the run script for the keepstore service, modifying it as directed below.
<notextile>
<pre><code>#!/bin/sh
h2. Install the Ruby SDK and utilities
-If you're using RVM:
+First, install the curl development libraries necessary to build the Arvados Ruby SDK. On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install libcurl4-openssl-dev</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install libcurl-devel</span>
+</code></pre>
+</notextile>
+
+Next, install the arvados-cli Ruby gem. If you're using RVM:
<notextile>
<pre><code>~$ <span class="userinput">sudo /usr/local/rvm/bin/rvm-exec default gem install arvados-cli</span>
</code></pre>
</notextile>
-{% include 'notebox_begin' %}
-
-If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
-<br/>
-<notextile>
-<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32 ident/127.0.0.1\/32 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo sed -i -e "s/::1\/128 ident/::1\/128 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo service postgresql restart</span>
-</code></pre>
-</notextile>
-{% include 'notebox_end' %}
-
+{% assign pg_service = "postgresql" %}
+{% assign pg_hba_path = "/var/lib/pgsql/data/pg_hba.conf" %}
+{% include 'install_redhat_postgres_auth' %}
Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
</code></pre>
</notextile>
-h2. Set up configuration files
-
-The Workbench server package uses configuration files that you write to @/etc/arvados/workbench@ and ensures they're consistently deployed. Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo cp /var/www/arvados-workbench/current/config/application.yml.example /etc/arvados/workbench/application.yml</span>
-</code></pre>
-</notextile>
-
h2(#configure). Configure Workbench
-Edit @/etc/arvados/workbench/application.yml@ following the instructions below. The deployment script will consistently deploy this to Workbench's configuration directory. Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file. Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
+Edit @/etc/arvados/workbench/application.yml@ following the instructions below. Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file. Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
Consult @config/application.default.yml@ for a full list of configuration options. Always put your local configuration in @/etc/arvados/workbench/application.yml@—never edit @config/application.default.yml@.
<li>If you're deploying on an older Red Hat-based distribution and installed Pythyon 2.7 from Software Collections, configure Nginx to use it:
<pre><code>~$ <span class="userinput">sudo usermod --shell /bin/bash nginx</span>
-~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 && -e /opt/rh/python27/enable ]] && source /opt/rh/python27/enable" >>~/.bash_profile'</span>
+~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 ]] && source scl_source enable python27" >>~/.bash_profile'</span>
</code></pre>
</li>
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
- $cmd,
- {fork => 1});
-if ($? != 0) {
- Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {label => "sanity check"});
+if ($exited != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($exited));
exit EX_TEMPFAIL;
}
Log(undef, "Sanity check OK");
my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
- # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
- Log (undef, "Clean work dirs");
-
- my $cleanpid = fork();
- if ($cleanpid == 0)
- {
- # Find FUSE mounts under $CRUNCH_TMP and unmount them.
- # Then clean up work directories.
- # TODO: When #5036 is done and widely deployed, we can limit mount's
- # -t option to simply fuse.keep.
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
- exit (1);
- }
- while (1)
- {
- last if $cleanpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($cleanpid);
- select (undef, undef, undef, 0.1);
- }
- if ($?) {
- Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+ # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
+ # up work directories crunch_tmp/work, crunch_tmp/opt,
+ # crunch_tmp/src*.
+ #
+ # TODO: When #5036 is done and widely deployed, we can limit mount's
+ # -t option to simply fuse.keep.
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ {label => "clean work dirs"});
+ if ($exited != 0) {
exit(EX_RETRY_UNLOCKED);
}
}
# If this job requires a Docker image, install that.
my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
+ Log (undef, "Install docker image $docker_locator");
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{
croak("No Docker image hash found from locator $docker_locator");
}
+ Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
- }
- if ($? != 0)
+
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+ {label => "load docker image"});
+ if ($exited != 0)
{
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+ exit(EX_RETRY_UNLOCKED);
}
# Determine whether this version of Docker supports memory+swap limits.
- srun(["srun", "--nodelist=" . $node[0]],
- ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
- {fork => 1});
- $docker_limitmem = ($? == 0);
+ ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . $node[0]],
+ [$docker_bin, 'run', '--help'],
+ {label => "check --memory-swap feature"});
+ $docker_limitmem = ($stdout =~ /--memory-swap/);
# Find a non-root Docker user to use.
# Tries the default user for the container, then 'crunch', then 'nobody',
# Docker containers.
my @tryusers = ("", "crunch", "nobody");
foreach my $try_user (@tryusers) {
+ my $label;
my $try_user_arg;
if ($try_user eq "") {
- Log(undef, "Checking if container default user is not UID 0");
+ $label = "check whether default user is UID 0";
$try_user_arg = "";
} else {
- Log(undef, "Checking if user '$try_user' is not UID 0");
+ $label = "check whether user '$try_user' is UID 0";
$try_user_arg = "--user=$try_user";
}
- srun(["srun", "--nodelist=" . $node[0]],
- ["/bin/sh", "-ec",
- "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
- " test \$a -ne 0"],
- {fork => 1});
- if ($? == 0) {
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . $node[0]],
+ ["/bin/sh", "-ec",
+ "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+ {label => $label});
+ chomp($stdout);
+ if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
$dockeruserarg = $try_user_arg;
if ($try_user eq "") {
Log(undef, "Container will run with default user");
}
}
else {
- my $install_exited;
+ my $exited;
my $install_script_tries_left = 3;
for (my $attempts = 0; $attempts < 3; $attempts++) {
- Log(undef, "Run install script on all workers");
-
my @srunargs = ("srun",
"--nodelist=$nodelist",
"-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($install_stderr_r, $install_stderr_w);
- pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
- set_nonblocking($install_stderr_r);
- my $installpid = fork();
- if ($installpid == 0)
- {
- close($install_stderr_r);
- fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
- open(STDOUT, ">&", $install_stderr_w);
- open(STDERR, ">&", $install_stderr_w);
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- close($install_stderr_w);
- # Tell freeze_if_want_freeze how to kill the child, otherwise the
- # "waitpid(installpid)" loop won't get interrupted by a freeze:
- $proc{$installpid} = {};
- my $stderr_buf = '';
- # Track whether anything appears on stderr other than slurm errors
- # ("srun: ...") and the "starting: ..." message printed by the
- # srun subroutine itself:
+ my ($stdout, $stderr);
+ ($exited, $stdout, $stderr) = srun_sync(
+ \@srunargs, \@execargs,
+ {label => "run install script on all workers"},
+ $build_script . $git_archive);
+
my $stderr_anything_from_script = 0;
- my $match_our_own_errors = '^(srun: error: |starting: \[)';
- while ($installpid != waitpid(-1, WNOHANG)) {
- freeze_if_want_freeze ($installpid);
- # Wait up to 0.1 seconds for something to appear on stderr, then
- # do a non-blocking read.
- my $bits = fhbits($install_stderr_r);
- select ($bits, undef, $bits, 0.1);
- if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
- {
- while ($stderr_buf =~ /^(.*?)\n/) {
- my $line = $1;
- substr $stderr_buf, 0, 1+length($line), "";
- Log(undef, "stderr $line");
- if ($line !~ /$match_our_own_errors/) {
- $stderr_anything_from_script = 1;
- }
- }
- }
- }
- delete $proc{$installpid};
- $install_exited = $?;
- close($install_stderr_r);
- if (length($stderr_buf) > 0) {
- if ($stderr_buf !~ /$match_our_own_errors/) {
+ for my $line (split(/\n/, $stderr)) {
+ if ($line !~ /^(srun: error: |starting: \[)/) {
$stderr_anything_from_script = 1;
}
- Log(undef, "stderr $stderr_buf")
}
- Log (undef, "Install script exited ".exit_status_s($install_exited));
- last if $install_exited == 0 || $main::please_freeze;
+ last if $exited == 0 || $main::please_freeze;
+
# If the install script fails but doesn't print an error message,
# the next thing anyone is likely to do is just run it again in
# case it was a transient problem like "slurm communication fails
unlink($tar_filename);
}
- if ($install_exited != 0) {
+ if ($exited != 0) {
croak("Giving up");
}
}
next;
}
shift @freeslot;
- $proc{$childpid} = { jobstep => $id,
- time => time,
- slot => $childslot,
- jobstepname => "$job_id.$id.$childpid",
- };
+ $proc{$childpid} = {
+ jobstepidx => $id,
+ time => time,
+ slot => $childslot,
+ jobstepname => "$job_id.$id.$childpid",
+ };
croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
$slot[$childslot]->{pid} = $childpid;
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
sub reapchildren
{
my $children_reaped = 0;
-
- while((my $pid = waitpid (-1, WNOHANG)) > 0)
+ while ((my $pid = waitpid (-1, WNOHANG)) > 0)
{
my $childstatus = $?;
+
my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
. "."
. $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
+ my $jobstepidx = $proc{$pid}->{jobstepidx};
if (!WIFEXITED($childstatus))
{
# child did not exit (may be temporarily stopped)
- Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
+ Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
next;
}
$children_reaped++;
my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
+ my $Jobstep = $jobstep[$jobstepidx];
my $exitvalue = $childstatus >> 8;
my $exitinfo = "exit ".exit_status_s($childstatus);
$Jobstep->{'arvados_task'}->reload;
my $task_success = $Jobstep->{'arvados_task'}->{success};
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
+ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
if (!defined $task_success) {
# task did not indicate one way or the other --> fail
- Log($jobstepid, sprintf(
+ Log($jobstepidx, sprintf(
"ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
# node is already suspected faulty and srun exited quickly
if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
$elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
+ Log ($jobstepidx, "blaming failure on suspect node " .
$slot[$proc{$pid}->{slot}]->{node}->{name});
$temporary_fail ||= 1;
}
ban_node_by_slot($proc{$pid}->{slot});
}
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
+ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+ ++$Jobstep->{'failures'},
+ $temporary_fail ? 'temporary' : 'permanent',
+ $elapsed));
if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
# Give up on this task, and the whole job
$main::success = 0;
}
# Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
+ push @jobstep_todo, $jobstepidx;
$Job->{'tasks_summary'}->{'failed'}++;
}
else
$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
+ push @jobstep_done, $jobstepidx;
+ Log ($jobstepidx, "success in $elapsed seconds");
}
$Jobstep->{exitcode} = $childstatus;
$Jobstep->{finishtime} = time;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
$Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
+ process_stderr_final ($jobstepidx);
+ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
+ close $reader{$jobstepidx};
+ delete $reader{$jobstepidx};
delete $slot[$proc{$pid}->{slot}]->{pid};
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
sub check_refresh_wanted
{
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
- if (@stat && $stat[9] > $latest_refresh) {
+ if (@stat &&
+ $stat[9] > $latest_refresh &&
+ # ...and we have actually locked the job record...
+ $job_id eq $Job->{'uuid'}) {
$latest_refresh = scalar time;
my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
+ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
{
if (!exists($js->{stderr_at}))
{
# use killem() on procs whose killtime is reached
while (my ($pid, $procinfo) = each %proc)
{
- my $js = $jobstep[$procinfo->{jobstep}];
+ my $js = $jobstep[$procinfo->{jobstepidx}];
if (exists $procinfo->{killtime}
&& $procinfo->{killtime} <= time
&& $js->{stderr_at} < $last_squeue_check)
if ($js->{stderr_at}) {
$sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
}
- Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
# Check for child procs >60s old and not mentioned by squeue.
while (my ($pid, $procinfo) = each %proc)
{
- my $js = $jobstep[$procinfo->{jobstep}];
if ($procinfo->{time} < time - 60
&& $procinfo->{jobstepname}
&& !exists $ok{$procinfo->{jobstepname}}
# error/delay has caused the task to die without notifying srun,
# and we'll kill srun ourselves.
$procinfo->{killtime} = time + 30;
- Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+ Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
my $gotsome = 0;
my %fd_job;
my $sel = IO::Select->new();
- foreach my $job (keys %reader)
+ foreach my $jobstepidx (keys %reader)
{
- my $fd = $reader{$job};
+ my $fd = $reader{$jobstepidx};
$sel->add($fd);
- $fd_job{$fd} = $job;
+ $fd_job{$fd} = $jobstepidx;
+
+ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+ $sel->add($stdout_fd);
+ $fd_job{$stdout_fd} = $jobstepidx;
+ }
}
# select on all reader fds with 0.1s timeout
my @ready_fds = $sel->can_read(0.1);
my $buf;
if (0 < sysread ($fd, $buf, 65536))
{
+ $gotsome = 1;
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- my $job = $fd_job{$fd};
- $jobstep[$job]->{stderr_at} = time;
- $jobstep[$job]->{stderr} .= $buf;
+
+ my $jobstepidx = $fd_job{$fd};
+ if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
+ $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ next;
+ }
+
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
# Consume everything up to the last \n
- preprocess_stderr ($job);
+ preprocess_stderr ($jobstepidx);
- if (length ($jobstep[$job]->{stderr}) > 16384)
+ if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
{
# If we get a lot of stderr without a newline, chop off the
# front to avoid letting our buffer grow indefinitely.
- substr ($jobstep[$job]->{stderr},
- 0, length($jobstep[$job]->{stderr}) - 8192) = "";
+ substr ($jobstep[$jobstepidx]->{stderr},
+ 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
}
- $gotsome = 1;
}
}
return $gotsome;
}
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
sub preprocess_stderr
{
- my $job = shift;
+ my $jobstepidx = shift;
- while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+ while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
- substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
- Log ($job, "stderr $line");
+ substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+ Log ($jobstepidx, "stderr $line");
if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: Node failure on/) {
- my $job_slot_index = $jobstep[$job]->{slotindex};
+ elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
+ # Skip the following tempfail checks if this srun proc isn't
+ # attached to a particular worker slot.
+ }
+ elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
$slot[$job_slot_index]->{node}->{fail_count}++;
- $jobstep[$job]->{tempfail} = 1;
+ $jobstep[$jobstepidx]->{tempfail} = 1;
ban_node_by_slot($job_slot_index);
}
elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
- $jobstep[$job]->{tempfail} = 1;
- ban_node_by_slot($jobstep[$job]->{slotindex});
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
}
elsif ($line =~ /arvados\.errors\.Keep/) {
- $jobstep[$job]->{tempfail} = 1;
+ $jobstep[$jobstepidx]->{tempfail} = 1;
}
}
}
-sub process_stderr
+sub process_stderr_final
{
- my $job = shift;
- my $task_success = shift;
- preprocess_stderr ($job);
+ my $jobstepidx = shift;
+ preprocess_stderr ($jobstepidx);
map {
- Log ($job, "stderr $_");
- } split ("\n", $jobstep[$job]->{stderr});
+ Log ($jobstepidx, "stderr $_");
+ } split ("\n", $jobstep[$jobstepidx]->{stderr});
+ $jobstep[$jobstepidx]->{stderr} = '';
}
sub fetch_block
}
if (!exists $proc{$_}->{"sent_$sig"})
{
- Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+ Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
kill $sig, $_;
select (undef, undef, undef, 0.1);
if ($sig == 2)
return $log_pipe_pid;
}
-sub Log # ($jobstep_id, $logmessage)
+sub Log # ($jobstepidx, $logmessage)
{
- if ($_[1] =~ /\n/) {
+ my ($jobstepidx, $logmessage) = @_;
+ if ($logmessage =~ /\n/) {
for my $line (split (/\n/, $_[1])) {
- Log ($_[0], $line);
+ Log ($jobstepidx, $line);
}
return;
}
my $fh = select STDERR; $|=1; select $fh;
- my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+ my $task_qseq = '';
+ if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+ $task_qseq = $jobstepidx;
+ }
+ my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
}
+sub srun_sync
+{
+ my $srunargs = shift;
+ my $execargs = shift;
+ my $opts = shift || {};
+ my $stdin = shift;
+
+ my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+ Log (undef, "$label: start");
+
+ my ($stderr_r, $stderr_w);
+ pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+ my ($stdout_r, $stdout_w);
+ pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+ my $srunpid = fork();
+ if ($srunpid == 0)
+ {
+ close($stderr_r);
+ close($stdout_r);
+ fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ fcntl($stdout_w, F_SETFL, 0) or croak($!);
+ open(STDERR, ">&", $stderr_w);
+ open(STDOUT, ">&", $stdout_w);
+ srun ($srunargs, $execargs, $opts, $stdin);
+ exit (1);
+ }
+ close($stderr_w);
+ close($stdout_w);
+
+ set_nonblocking($stderr_r);
+ set_nonblocking($stdout_r);
+
+ # Add entries to @jobstep and %proc so check_squeue() and
+ # freeze_if_want_freeze() can treat it like a job task process.
+ push @jobstep, {
+ stderr => '',
+ stderr_at => 0,
+ stderr_captured => '',
+ stdout_r => $stdout_r,
+ stdout_captured => '',
+ };
+ my $jobstepidx = $#jobstep;
+ $proc{$srunpid} = {
+ jobstepidx => $jobstepidx,
+ };
+ $reader{$jobstepidx} = $stderr_r;
+
+ while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+ my $busy = readfrompipes();
+ if (!$busy || ($latest_refresh + 2 < scalar time)) {
+ check_refresh_wanted();
+ check_squeue();
+ }
+ if (!$busy) {
+ select(undef, undef, undef, 0.1);
+ }
+ killem(keys %proc) if $main::please_freeze;
+ }
+ my $exited = $?;
+
+ 1 while readfrompipes();
+ process_stderr_final ($jobstepidx);
+
+ Log (undef, "$label: exit ".exit_status_s($exited));
+
+ close($stdout_r);
+ close($stderr_r);
+ delete $proc{$srunpid};
+ delete $reader{$jobstepidx};
+
+ my $j = pop @jobstep;
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
sub srun
{
my $srunargs = shift;
#!/bin/sh
echo >&2 Failing mount stub was called
-exit 1
+exit 44
tryjobrecord j, binstubs: ['clean_fail']
end
assert_match /Failing mount stub was called/, err
- assert_match /Clean work dirs: exit 1\n$/, err
+ assert_match /clean work dirs: exit 44\n$/, err
assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
end
import cwltool.draft2tool
import cwltool.workflow
import cwltool.main
+from cwltool.process import shortname
import threading
import cwltool.docker
import fnmatch
args = [image_name]
if image_tag:
args.append(image_tag)
+ logger.info("Uploading Docker image %s", ":".join(args))
arvados.commands.keepdocker.main(args)
return dockerRequirement["dockerImageId"]
"script_version": "master",
"script_parameters": {"tasks": [script_parameters]},
"runtime_constraints": runtime_constraints
- }, find_or_create=kwargs.get("enable_reuse", True)).execute()
+ }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
self.arvrunner.jobs[response["uuid"]] = self
- logger.info("Job %s is %s", response["uuid"], response["state"])
+ self.arvrunner.pipeline["components"][self.name] = {"job": response}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
+
+ logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
if response["state"] in ("Complete", "Failed", "Cancelled"):
self.done(response)
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
+ def update_pipeline_component(self, record):
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
def done(self, record):
+ try:
+ self.update_pipeline_component(record)
+ except:
+ pass
+
try:
if record["state"] == "Complete":
processStatus = "success"
try:
outputs = {}
- outputs = self.collect_outputs("keep:" + record["output"])
+ if record["output"]:
+ outputs = self.collect_outputs("keep:" + record["output"])
except Exception as e:
logger.exception("Got exception while collecting job outputs:")
processStatus = "permanentFail"
self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
if src not in self._pathmap:
ab = cwltool.pathmapper.abspath(src, basedir)
- st = arvados.commands.run.statfile("", ab)
+ st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
if kwargs.get("conformance_test"):
self._pathmap[src] = (src, ab)
elif isinstance(st, arvados.commands.run.UploadFile):
self.cond = threading.Condition(self.lock)
self.final_output = None
self.uploaded = {}
+ self.num_retries = 4
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
def output_callback(self, out, processStatus):
if processStatus == "success":
logger.info("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+
else:
logger.warn("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_output = out
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
- logger.info("Job %s is Running", event["object_uuid"])
+ uuid = event["object_uuid"]
with self.lock:
- self.jobs[event["object_uuid"]].running = True
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is Running", j.name, uuid)
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
+ uuid = event["object_uuid"]
try:
self.cond.acquire()
- self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
self.fs_access = CollectionFsAccess(input_basedir)
kwargs["fs_access"] = self.fs_access
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20151026181844',
- 'arvados-python-client>=0.1.20151023214338'
+ 'cwltool>=1.0.20160129152024',
+ 'arvados-python-client>=0.1.20160122132348'
],
zip_safe=True,
cmdclass={'egg_info': tagger},
# ArvFile() (file already exists in a collection), UploadFile() (file needs to
# be uploaded to a collection), or simply returns prefix+fn (which yields the
# original parameter string).
-def statfile(prefix, fn):
+def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
absfn = os.path.abspath(fn)
if os.path.exists(absfn):
st = os.stat(absfn)
sp = os.path.split(absfn)
(pdh, branch) = is_in_collection(sp[0], sp[1])
if pdh:
- return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
+ return ArvFile(prefix, fnPattern % (pdh, branch))
else:
# trim leading '/' for path prefix test later
return UploadFile(prefix, absfn[1:])
sp = os.path.split(absfn)
(pdh, branch) = is_in_collection(sp[0], sp[1])
if pdh:
- return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch))
+ return ArvFile(prefix, dirPattern % (pdh, branch))
return prefix+fn
_logger = logging.getLogger('arvados.events')
+
class EventClient(WebSocketClient):
def __init__(self, url, filters, on_event, last_log_id):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
- self._closed_lock = threading.RLock()
- self._closed = False
+ self._closing_lock = threading.RLock()
+ self._closing = False
+ self._closed = threading.Event()
def opened(self):
self.subscribe(self.filters, self.last_log_id)
+ def closed(self, code, reason=None):
+ self._closed.set()
+
def received_message(self, m):
- with self._closed_lock:
- if not self._closed:
+ with self._closing_lock:
+ if not self._closing:
self.on_event(json.loads(str(m)))
- def close(self, code=1000, reason=''):
- """Close event client and wait for it to finish."""
+ def close(self, code=1000, reason='', timeout=0):
+ """Close event client and optionally wait for it to finish.
+
+ :timeout: is the number of seconds to wait for ws4py to
+ indicate that the connection has closed.
+ """
super(EventClient, self).close(code, reason)
- with self._closed_lock:
+ with self._closing_lock:
# make sure we don't process any more messages.
- self._closed = True
+ self._closing = True
+ # wait for ws4py to tell us the connection is closed.
+ self._closed.wait(timeout=timeout)
def subscribe(self, filters, last_log_id=None):
m = {"method": "subscribe", "filters": filters}
def unsubscribe(self, filters):
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
super(PollClient, self).__init__()
self.on_event = on_event
self.poll_time = poll_time
self.daemon = True
- self.stop = threading.Event()
self.last_log_id = last_log_id
+ self._closing = threading.Event()
+ self._closing_lock = threading.RLock()
def run(self):
self.id = 0
self.on_event({'status': 200})
- while not self.stop.isSet():
+ while not self._closing.is_set():
max_id = self.id
moreitems = False
for f in self.filters:
for i in items["items"]:
if i['id'] > max_id:
max_id = i['id']
- self.on_event(i)
+ with self._closing_lock:
+ if self._closing.is_set():
+ return
+ self.on_event(i)
if items["items_available"] > len(items["items"]):
moreitems = True
self.id = max_id
if not moreitems:
- self.stop.wait(self.poll_time)
+ self._closing.wait(self.poll_time)
def run_forever(self):
# Have to poll here, otherwise KeyboardInterrupt will never get processed.
- while not self.stop.is_set():
- self.stop.wait(1)
+ while not self._closing.is_set():
+ self._closing.wait(1)
+
+ def close(self, code=None, reason=None, timeout=0):
+ """Close poll client and optionally wait for it to finish.
- def close(self):
- """Close poll client and wait for it to finish."""
+ If an :on_event: handler is running in a different thread,
+ first wait (indefinitely) for it to return.
- self.stop.set()
+ After closing, wait up to :timeout: seconds for the thread to
+ finish the poll request in progress (if any).
+
+ :code: and :reason: are ignored. They are present for
+ interface compatibility with EventClient.
+ """
+
+ with self._closing_lock:
+ self._closing.set()
try:
- self.join()
+ self.join(timeout=timeout)
except RuntimeError:
# "join() raises a RuntimeError if an attempt is made to join the
# current thread as that would cause a deadlock. It is also an
return _subscribe_websocket(api, filters, on_event, last_log_id)
try:
- return _subscribe_websocket(api, filters, on_event, last_log_id)
+ if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
+ return _subscribe_websocket(api, filters, on_event, last_log_id)
+ else:
+ _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
except Exception as e:
_logger.warn("Falling back to polling after websocket error: %s" % e)
p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
],
install_requires=[
+ 'google-api-python-client==1.4.2',
+ 'oauth2client==1.5.1',
'ciso8601',
- 'google-api-python-client',
'httplib2',
'pycurl >=7.19.5.1, <7.21.5',
- 'python-gflags',
+ 'python-gflags<3.0',
'ws4py'
],
test_suite='tests',
# Dev/test SSL certificates
/self-signed.key
/self-signed.pem
+
+# Generated git-commit.version file
+/git-commit.version
gem 'themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151023185755'
+gem 'arvados-cli', '>= 0.1.20151207150126'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
google-api-client (~> 0.6.3, >= 0.6.3)
json (~> 1.7, >= 1.7.7)
jwt (>= 0.1.5, < 1.0.0)
- arvados-cli (0.1.20151023190001)
+ arvados-cli (0.1.20151207150126)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
acts_as_api
andand
arvados (>= 0.1.20150615153458)
- arvados-cli (>= 0.1.20151023185755)
+ arvados-cli (>= 0.1.20151207150126)
coffee-rails (~> 3.2.0)
database_cleaner
factory_girl_rails
((attr == 'scopes') and (operator == '=')) ? operand : nil
})
@filters.select! { |attr, operator, operand|
- (attr == 'uuid') and (operator == '=')
+ ((attr == 'uuid') and (operator == '=')) || ((attr == 'api_token') and (operator == '='))
}
end
if @where
end
def find_object_by_uuid
- # Again, to make things easier for the client and our own routing,
- # here we look for the api_token key in a "uuid" (POST) or "id"
- # (GET) parameter.
- @object = model_class.where('api_token=?', params[:uuid] || params[:id]).first
+ @object = model_class.where(uuid: (params[:uuid] || params[:id])).first
end
def current_api_client_is_trusted
unless Thread.current[:api_client].andand.is_trusted
+ if params["action"] == "show"
+ if @object and @object['api_token'] == current_api_client_authorization.andand.api_token
+ return true
+ end
+ elsif params["action"] == "index" and @objects.andand.size == 1
+ filters = @filters.map{|f|f.first}.uniq
+ if ['uuid'] == filters
+ return true if @objects.first['api_token'] == current_api_client_authorization.andand.api_token
+ elsif ['api_token'] == filters
+ return true if @objects.first[:user_id] = current_user.id
+ end
+ end
send_error('Forbidden: this API client cannot manipulate other clients\' access tokens.',
status: 403)
end
return render_not_found
end
ping_data = {
- ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+ ip: params[:local_ipv4] || request.remote_ip,
ec2_instance_id: params[:instance_id]
}
[:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
class ApiClientAuthorization < ArvadosModel
+ include HasUuid
include KindAndEtag
include CommonApiTemplate
self.user_id_changed?
end
- def uuid
- self.api_token
- end
- def uuid=(x) end
- def uuid_was
- self.api_token_was
- end
- def uuid_changed?
- self.api_token_changed?
- end
-
def modified_by_client_uuid
nil
end
["#{table_name}.modified_at desc", "#{table_name}.uuid"]
end
+ def self.unique_columns
+ ["id", "uuid"]
+ end
+
# If current user can manage the object, return an array of uuids of
# users and groups that have permission to write the object. The
# first two elements are always [self.owner_uuid, current user's
--- /dev/null
+require 'has_uuid'
+
+class AddUuidToApiClientAuthorization < ActiveRecord::Migration
+ extend HasUuid::ClassMethods
+
+ def up
+ add_column :api_client_authorizations, :uuid, :string
+ add_index :api_client_authorizations, :uuid, :unique => true
+
+ prefix = Server::Application.config.uuid_prefix + '-' +
+ Digest::MD5.hexdigest('ApiClientAuthorization'.to_s).to_i(16).to_s(36)[-5..-1] + '-'
+
+ update_sql <<-EOS
+update api_client_authorizations set uuid = (select concat('#{prefix}',
+array_to_string(ARRAY (SELECT substring(api_token FROM (ceil(random()*36))::int FOR 1) FROM generate_series(1, 15)), '')
+));
+EOS
+
+ change_column_null :api_client_authorizations, :uuid, false
+ end
+
+ def down
+ if column_exists?(:api_client_authorizations, :uuid)
+ remove_index :api_client_authorizations, :uuid
+ remove_column :api_client_authorizations, :uuid
+ end
+ end
+end
--- /dev/null
+class AddUuidToApiTokenSearchIndex < ActiveRecord::Migration
+ def up
+ begin
+ remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+ rescue
+ end
+ add_index :api_client_authorizations,
+ ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid", "uuid"],
+ name: "api_client_authorizations_search_index"
+ end
+
+ def down
+ begin
+ remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+ rescue
+ end
+ add_index :api_client_authorizations,
+ ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid"],
+ name: "api_client_authorizations_search_index"
+ end
+end
default_owner_uuid character varying(255),
scopes text DEFAULT '---
- all
-'::text NOT NULL
+'::text NOT NULL,
+ uuid character varying(255) NOT NULL
);
-- Name: api_client_authorizations_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid);
+CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid, uuid);
--
CREATE INDEX index_api_client_authorizations_on_user_id ON api_client_authorizations USING btree (user_id);
+--
+-- Name: index_api_client_authorizations_on_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE UNIQUE INDEX index_api_client_authorizations_on_uuid ON api_client_authorizations USING btree (uuid);
+
+
--
-- Name: index_api_clients_on_created_at; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
INSERT INTO schema_migrations (version) VALUES ('20151215134304');
-INSERT INTO schema_migrations (version) VALUES ('20151229214707');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20151229214707');
+
+INSERT INTO schema_migrations (version) VALUES ('20160208210629');
+
+INSERT INTO schema_migrations (version) VALUES ('20160209155729');
\ No newline at end of file
fpm_depends+=(libcurl-devel postgresql-devel)
;;
debian* | ubuntu*)
- fpm_depends+=(libcurl4-openssl-dev libpq-dev)
+ fpm_depends+=(libcurl-ssl-dev libpq-dev)
;;
esac
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status == EXIT_RETRY_UNLOCKED
- # The job failed because all of the nodes allocated to it
- # failed. Only this crunch-dispatch process can retry the job:
+ if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ # Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
# (e.g., [] or ['owner_uuid desc']), fall back on the default
# orders to ensure repeating the same request (possibly with
# different limit/offset) will return records in the same order.
- @orders += model_class.default_orders
+ #
+ # Clean up the resulting list of orders such that no column
+ # uselessly appears twice (Postgres might not optimize this out
+ # for us) and no columns uselessly appear after a unique column
+ # (Postgres does not optimize this out for us; as of 9.2, "order
+ # by id, modified_at desc, uuid" is slow but "order by id" is
+ # fast).
+ orders_given_and_default = @orders + model_class.default_orders
+ order_cols_used = {}
+ @orders = []
+ orders_given_and_default.each do |order|
+ otablecol = order.split(' ')[0]
+
+ next if order_cols_used[otablecol]
+ order_cols_used[otablecol] = true
+
+ @orders << order
+
+ otable, ocol = otablecol.split('.')
+ if otable == table_name and model_class.unique_columns.include?(ocol)
+ # we already have a full ordering; subsequent entries would be
+ # superfluous
+ break
+ end
+ end
case params[:select]
when Array
# Read about fixtures at http://api.rubyonrails.org/classes/ActiveRecord/Fixtures.html
system_user:
+ uuid: zzzzz-gj3su-017z32aux8dg2s1
api_client: untrusted
user: system_user
api_token: systemusertesttoken1234567890aoeuidhtnsqjkxbmwvzpy
expires_at: 2038-01-01 00:00:00
admin:
+ uuid: zzzzz-gj3su-027z32aux8dg2s1
api_client: untrusted
user: admin
api_token: 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
expires_at: 2038-01-01 00:00:00
admin_trustedclient:
+ uuid: zzzzz-gj3su-037z32aux8dg2s1
api_client: trusted_workbench
user: admin
api_token: 1a9ffdcga2o7cw8q12dndskomgs1ygli3ns9k2o9hgzgmktc78
expires_at: 2038-01-01 00:00:00
data_manager:
+ uuid: zzzzz-gj3su-047z32aux8dg2s1
api_client: untrusted
user: system_user
api_token: 320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1
- POST /arvados/v1/logs
miniadmin:
+ uuid: zzzzz-gj3su-057z32aux8dg2s1
api_client: untrusted
user: miniadmin
api_token: 2zb2y9pw3e70270te7oe3ewaantea3adyxjascvkz0zob7q7xb
expires_at: 2038-01-01 00:00:00
rominiadmin:
+ uuid: zzzzz-gj3su-067z32aux8dg2s1
api_client: untrusted
user: rominiadmin
api_token: 5tsb2pc3zlatn1ortl98s2tqsehpby88wmmnzmpsjmzwa6payh
expires_at: 2038-01-01 00:00:00
active:
+ uuid: zzzzz-gj3su-077z32aux8dg2s1
api_client: untrusted
user: active
api_token: 3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
expires_at: 2038-01-01 00:00:00
active_trustedclient:
+ uuid: zzzzz-gj3su-087z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: 27bnddk6x2nmq00a1e3gq43n9tsl5v87a3faqar2ijj8tud5en
expires_at: 2038-01-01 00:00:00
active_noscope:
+ uuid: zzzzz-gj3su-097z32aux8dg2s1
api_client: untrusted
user: active
api_token: activenoscopeabcdefghijklmnopqrstuvwxyz12345678901
scopes: []
project_viewer:
+ uuid: zzzzz-gj3su-107z32aux8dg2s1
api_client: untrusted
user: project_viewer
api_token: projectviewertoken1234567890abcdefghijklmnopqrstuv
expires_at: 2038-01-01 00:00:00
project_viewer_trustedclient:
+ uuid: zzzzz-gj3su-117z32aux8dg2s1
api_client: trusted_workbench
user: project_viewer
api_token: projectviewertrustedtoken1234567890abcdefghijklmno
expires_at: 2038-01-01 00:00:00
subproject_admin:
+ uuid: zzzzz-gj3su-127z32aux8dg2s1
api_client: untrusted
user: subproject_admin
api_token: subprojectadmintoken1234567890abcdefghijklmnopqrst
expires_at: 2038-01-01 00:00:00
admin_vm:
+ uuid: zzzzz-gj3su-137z32aux8dg2s1
api_client: untrusted
user: admin
api_token: adminvirtualmachineabcdefghijklmnopqrstuvwxyz12345
scopes: ["GET /arvados/v1/virtual_machines/zzzzz-2x53u-382brsig8rp3064/logins"]
admin_noscope:
+ uuid: zzzzz-gj3su-147z32aux8dg2s1
api_client: untrusted
user: admin
api_token: adminnoscopeabcdefghijklmnopqrstuvwxyz123456789012
scopes: []
active_all_collections:
+ uuid: zzzzz-gj3su-157z32aux8dg2s1
api_client: untrusted
user: active
api_token: activecollectionsabcdefghijklmnopqrstuvwxyz1234567
scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_services/accessible"]
active_userlist:
+ uuid: zzzzz-gj3su-167z32aux8dg2s1
api_client: untrusted
user: active
api_token: activeuserlistabcdefghijklmnopqrstuvwxyz1234568900
scopes: ["GET /arvados/v1/users"]
active_specimens:
+ uuid: zzzzz-gj3su-177z32aux8dg2s1
api_client: untrusted
user: active
api_token: activespecimensabcdefghijklmnopqrstuvwxyz123456890
scopes: ["GET /arvados/v1/specimens/"]
active_apitokens:
+ uuid: zzzzz-gj3su-187z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: activeapitokensabcdefghijklmnopqrstuvwxyz123456789
"POST /arvados/v1/api_client_authorizations"]
active_readonly:
+ uuid: zzzzz-gj3su-197z32aux8dg2s1
api_client: untrusted
user: active
api_token: activereadonlyabcdefghijklmnopqrstuvwxyz1234568790
scopes: ["GET /"]
spectator:
+ uuid: zzzzz-gj3su-207z32aux8dg2s1
api_client: untrusted
user: spectator
api_token: zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu
expires_at: 2038-01-01 00:00:00
spectator_specimens:
+ uuid: zzzzz-gj3su-217z32aux8dg2s1
api_client: untrusted
user: spectator
api_token: spectatorspecimensabcdefghijklmnopqrstuvwxyz123245
"POST /arvados/v1/specimens"]
inactive:
+ uuid: zzzzz-gj3su-227z32aux8dg2s1
api_client: untrusted
user: inactive
api_token: 5s29oj2hzmcmpq80hx9cta0rl5wuf3xfd6r7disusaptz7h9m0
expires_at: 2038-01-01 00:00:00
inactive_uninvited:
+ uuid: zzzzz-gj3su-237z32aux8dg2s1
api_client: untrusted
user: inactive_uninvited
api_token: 62mhllc0otp78v08e3rpa3nsmf8q8ogk47f7u5z4erp5gpj9al
expires_at: 2038-01-01 00:00:00
inactive_but_signed_user_agreement:
+ uuid: zzzzz-gj3su-247z32aux8dg2s1
api_client: untrusted
user: inactive_but_signed_user_agreement
api_token: 64k3bzw37iwpdlexczj02rw3m333rrb8ydvn2qq99ohv68so5k
expires_at: 2038-01-01 00:00:00
expired:
+ uuid: zzzzz-gj3su-257z32aux8dg2s1
api_client: untrusted
user: active
api_token: 2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx
expires_at: 1970-01-01 00:00:00
expired_trustedclient:
+ uuid: zzzzz-gj3su-267z32aux8dg2s1
api_client: trusted_workbench
user: active
api_token: 5hpni7izokzcatku2896xxwqdbt5ptomn04r6auc7fohnli82v
expires_at: 1970-01-01 00:00:00
valid_token_deleted_user:
+ uuid: zzzzz-gj3su-277z32aux8dg2s1
api_client: trusted_workbench
user_id: 1234567
api_token: tewfa58099sndckyqhlgd37za6e47o6h03r9l1vpll23hudm8b
expires_at: 2038-01-01 00:00:00
anonymous:
+ uuid: zzzzz-gj3su-287z32aux8dg2s1
api_client: untrusted
user: anonymous
api_token: 4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi
scopes: ["GET /"]
job_reader:
+ uuid: zzzzz-gj3su-297z32aux8dg2s1
api_client: untrusted
user: job_reader
api_token: e99512cdc0f3415c2428b9758f33bdfb07bc3561b00e86e7e6
expires_at: 2038-01-01 00:00:00
active_no_prefs:
+ uuid: zzzzz-gj3su-307z32aux8dg2s1
api_client: untrusted
user: active_no_prefs
api_token: 3kg612cdc0f3415c2428b9758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_no_prefs_profile_no_getting_started_shown:
+ uuid: zzzzz-gj3su-317z32aux8dg2s1
api_client: untrusted
user: active_no_prefs_profile_no_getting_started_shown
api_token: 3kg612cdc0f3415c242856758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_no_prefs_profile_with_getting_started_shown:
+ uuid: zzzzz-gj3su-327z32aux8dg2s1
api_client: untrusted
user: active_no_prefs_profile_with_getting_started_shown
api_token: 3kg612cdc0f3415c245786758f33bdfb07babcd1b00e86qdmi
expires_at: 2038-01-01 00:00:00
active_with_prefs_profile_no_getting_started_shown:
+ uuid: zzzzz-gj3su-337z32aux8dg2s1
api_client: untrusted
user: active_with_prefs_profile_no_getting_started_shown
api_token: 3kg612cdc0f3415c245786758f33bdfb07befgh1b00e86qdmi
expires_at: 2038-01-01 00:00:00
user_foo_in_sharing_group:
+ uuid: zzzzz-gj3su-347z32aux8dg2s1
api_client: untrusted
user: user_foo_in_sharing_group
api_token: 2p1pou8p4ls208mcbedeewlotghppenobcyrmyhq8pyf51xd8u
expires_at: 2038-01-01 00:00:00
user1_with_load:
+ uuid: zzzzz-gj3su-357z32aux8dg2s1
api_client: untrusted
user: user1_with_load
api_token: 1234k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
expires_at: 2038-01-01 00:00:00
fuse:
+ uuid: zzzzz-gj3su-367z32aux8dg2s1
api_client: untrusted
user: fuse
api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
name: Subproject to test owner uuid and name unique key violation upon removal
description: "Removing this will result in name conflict with 'A project' in Home project and hence get renamed."
group_class: project
+
+starred_and_shared_active_user_project:
+ uuid: zzzzz-j7d0g-starredshared01
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-04-21 15:37:48 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2014-04-21 15:37:48 -0400
+ updated_at: 2014-04-21 15:37:48 -0400
+ name: Starred and shared active user project
+ description: Starred and shared active user project
+ group_class: project
properties: {}
updated_at: 2014-08-06 22:11:51.242010312 Z
+star_project_for_active_user:
+ uuid: zzzzz-o0j2j-starredbyactive
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ link_class: star
+ name: zzzzz-j7d0g-starredshared01
+ head_uuid: zzzzz-j7d0g-starredshared01
+ properties: {}
+
+share_starred_project_with_project_viewer:
+ uuid: zzzzz-o0j2j-sharewithviewer
+ owner_uuid: zzzzz-tpzed-000000000000000
+ tail_uuid: zzzzz-tpzed-projectviewer1a
+ link_class: permission
+ name: can_read
+ head_uuid: zzzzz-j7d0g-starredshared01
+
+star_shared_project_for_project_viewer:
+ uuid: zzzzz-o0j2j-starredbyviewer
+ owner_uuid: zzzzz-tpzed-projectviewer1a
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zzzzz-tpzed-projectviewer1a
+ link_class: star
+ name: zzzzz-j7d0g-starredshared01
+ head_uuid: zzzzz-j7d0g-starredshared01
+ properties: {}
job_uuid: ~
info:
ping_secret: "abcdyi0x4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
+
+node_with_no_ip_address_yet:
+ uuid: zzzzz-7ekkf-nodenoipaddryet
+ owner_uuid: zzzzz-tpzed-000000000000000
+ hostname: noipaddr
+ slot_number: ~
+ last_ping_at: ~
+ first_ping_at: ~
+ job_uuid: ~
+ info:
+ ping_secret: "abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
authorize_with :admin_trustedclient
post :create_system_auth, scopes: '["test"]'
assert_response :success
+ assert_not_nil JSON.parse(@response.body)['uuid']
end
test "prohibit create system auth with token from non-trusted client" do
assert_found_tokens(auth, {filters: [['scopes', '=', scopes]]}, *expected)
end
end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't get it by uuid
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} get #{token} token and expect #{status}" do
+ authorize_with user
+ get :show, {id: api_client_authorizations(token).uuid}
+ assert_response status
+ end
+ end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't list it by uuid
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} list #{token} token using uuid and expect #{status}" do
+ authorize_with user
+ get :index, {
+ filters: [['uuid','=',api_client_authorizations(token).uuid]]
+ }
+ assert_response status
+ end
+ end
+
+ [
+ [:admin, :admin, 200],
+ [:admin, :active, 403],
+ [:admin, :admin_vm, 200], # this belongs to the user of current session, and can be listed by token
+ [:admin_trustedclient, :active, 200],
+ ].each do |user, token, status|
+ test "as user #{user} list #{token} token using token and expect #{status}" do
+ authorize_with user
+ get :index, {
+ filters: [['api_token','=',api_client_authorizations(token).api_token]]
+ }
+ assert_response status
+ end
+ end
end
'A Project (2)',
"new project name '#{new_project['name']}' was expected to be 'A Project (2)'")
end
+
+ test "unsharing a project results in hiding it from previously shared user" do
+ # remove sharing link for project
+ @controller = Arvados::V1::LinksController.new
+ authorize_with :admin
+ post :destroy, id: links(:share_starred_project_with_project_viewer).uuid
+ assert_response :success
+
+ # verify that the user can no longer see the project
+ @counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :project_viewer
+ get :index, filters: [['group_class', '=', 'project']], format: :json
+ assert_response :success
+ found_projects = {}
+ json_response['items'].each do |g|
+ found_projects[g['uuid']] = g
+ end
+ assert_equal false, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+
+ # share the project
+ @counter = 0
+ @controller = Arvados::V1::LinksController.new
+ authorize_with :system_user
+ post :create, link: {
+ link_class: "permission",
+ name: "can_read",
+ head_uuid: groups(:starred_and_shared_active_user_project).uuid,
+ tail_uuid: users(:project_viewer).uuid,
+ }
+
+ # verify that project_viewer user can now see shared project again
+ @counter = 0
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :project_viewer
+ get :index, filters: [['group_class', '=', 'project']], format: :json
+ assert_response :success
+ found_projects = {}
+ json_response['items'].each do |g|
+ found_projects[g['uuid']] = g
+ end
+ assert_equal true, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+ end
end
}
assert_response 422
end
+
+ test "first ping should set ip addr using local_ipv4 when provided" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-nodenoipaddryet',
+ instance_id: 'i-0000000',
+ local_ipv4: '172.17.2.172',
+ ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+ assert_equal '172.17.2.172', response['ip_address']
+ end
+
+ test "first ping should set ip addr using remote_ip when local_ipv4 is not provided" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-nodenoipaddryet',
+ instance_id: 'i-0000000',
+ ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+ assert_equal request.remote_ip, response['ip_address']
+ end
+
+ test "future pings should not change previous ip address" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-2z3mc76g2q73aio',
+ instance_id: 'i-0000000',
+ local_ipv4: '172.17.2.175',
+ ping_secret: '69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-2z3mc76g2q73aio', response['uuid']
+ assert_equal '172.17.2.174', response['ip_address'] # original ip address is not overwritten
+ end
end
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::QueryTest < ActionController::TestCase
+ test 'no fallback orders when order is unambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['id asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal ['logs.id asc'], assigns(:objects).order_values
+ end
+
+ test 'fallback orders when order is ambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'skip fallback orders already given by client' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['modified_at asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate superfluous orders' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['logs.modified_at asc',
+ 'modified_at desc',
+ 'event_type desc',
+ 'logs.event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate orders after the first unique column' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc',
+ 'id asc',
+ 'uuid asc',
+ 'modified_at desc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.id asc',
+ assigns(:objects).order_values.join(', '))
+ end
+end
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- statLog.Printf("read %s: %s\n", statsFilename, err)
+ statLog.Printf("error reading %s: %s\n", statsFilename, err)
continue
}
return strings.NewReader(string(stats)), nil
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- statLog.Println("caught signal:", catch)
+ statLog.Println("notice: caught signal:", catch)
}(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
+ self.add_argument('--unmount-timeout',
+ type=float, default=2.0,
+ help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
help="""Mount, run a command, then unmount and exit""")
def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
self.logger = logger
self.args = args
+ self.listen_for_events = False
self.args.mountpoint = os.path.realpath(self.args.mountpoint)
if self.args.logfile:
def __enter__(self):
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if self.args.mode != 'by_pdh':
+ if self.listen_for_events:
self.operations.listen_for_events()
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
+ self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+ self.llfuse_thread.daemon = True
+ self.llfuse_thread.start()
self.operations.initlock.wait()
def __exit__(self, exc_type, exc_value, traceback):
subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
- self.operations.destroy()
+ self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+ if self.llfuse_thread.is_alive():
+ self.logger.warning("Mount.__exit__:"
+ " llfuse thread still alive %fs after umount"
+ " -- abandoning and exiting anyway",
+ self.args.unmount_timeout)
def run(self):
if self.args.exec_args:
mount_readme = True
if dir_class is not None:
- self.operations.inodes.add_entry(dir_class(*dir_args))
+ ent = dir_class(*dir_args)
+ self.operations.inodes.add_entry(ent)
+ self.listen_for_events = ent.want_event_subscribe()
return
e = self.operations.inodes.add_entry(Directory(
if name in ['', '.', '..'] or '/' in name:
sys.exit("Mount point '{}' is not supported.".format(name))
tld._entries[name] = self.operations.inodes.add_entry(ent)
+ self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
def _readme_text(self, api_host, user_email):
return '''
'''.format(api_host, user_email)
def _run_exec(self):
- # Initialize the fuse connection
- llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
- # Subscribe to change events from API server
- if self.args.mode != 'by_pdh':
- self.operations.listen_for_events()
-
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- self.operations.initlock.wait()
-
rc = 255
- try:
- sp = subprocess.Popen(self.args.exec_args, shell=False)
-
- # forward signals to the process.
- signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
- signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
- signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
- # wait for process to complete.
- rc = sp.wait()
-
- # restore default signal handlers.
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGQUIT, signal.SIG_DFL)
- except Exception as e:
- self.logger.exception(
- 'arv-mount: exception during exec %s', self.args.exec_args)
+ with self:
try:
- rc = e.errno
- except AttributeError:
- pass
- finally:
- subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
- self.operations.destroy()
+ sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+ # forward signals to the process.
+ signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+ # wait for process to complete.
+ rc = sp.wait()
+
+ # restore default signal handlers.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+ except Exception as e:
+ self.logger.exception(
+ 'arv-mount: exception during exec %s', self.args.exec_args)
+ try:
+ rc = e.errno
+ except AttributeError:
+ pass
exit(rc)
def _run_standalone(self):
try:
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if not (self.args.exec_args or self.args.foreground):
- self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
- files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+ if not self.args.foreground:
+ self.daemon_ctx = daemon.DaemonContext(
+ working_directory=os.path.dirname(self.args.mountpoint),
+ files_preserve=range(
+ 3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
self.daemon_ctx.open()
# Subscribe to change events from API server
- self.operations.listen_for_events()
+ if self.listen_for_events:
+ self.operations.listen_for_events()
- llfuse.main()
+ self._llfuse_main()
except Exception as e:
self.logger.exception('arv-mount: exception during mount: %s', e)
exit(getattr(e, 'errno', 1))
- finally:
- self.operations.destroy()
exit(0)
+
+ def _llfuse_main(self):
+ try:
+ llfuse.main()
+ except:
+ llfuse.close(unmount=False)
+ raise
+ llfuse.close()
def flush(self):
pass
+ def want_event_subscribe(self):
+ raise NotImplementedError()
+
def create(self, name):
raise NotImplementedError()
def writable(self):
return self.collection.writable() if self.collection is not None else self._writable
+ def want_event_subscribe(self):
+ return (uuid_pattern.match(self.collection_locator) is not None)
+
# Used by arv-web.py to switch the contents of the CollectionDirectory
def change_collection(self, new_locator):
"""Switch the contents of the CollectionDirectory.
def writable(self):
return True
+ def want_event_subscribe(self):
+ return False
+
def finalize(self):
self.collection.stop_threads()
def clear(self, force=False):
pass
+ def want_event_subscribe(self):
+ return not self.pdh_only
+
class RecursiveInvalidateDirectory(Directory):
def invalidate(self):
self._poll = True
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._poll = poll
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._updating_lock = threading.Lock()
self._current_user = None
+ def want_event_subscribe(self):
+ return True
+
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception:
_logger.exception()
+
+ def want_event_subscribe(self):
+ return True
--- /dev/null
+fpm_depends+=(fuse)
],
install_requires=[
'arvados-python-client >= 0.1.20151118035730',
- 'llfuse>=0.40',
+ 'llfuse==0.41.1',
'python-daemon',
'ciso8601'
],
def wrapper(self, *args, **kwargs):
with arvados_fuse.command.Mount(
arvados_fuse.command.ArgumentParser().parse_args(
- argv + ['--foreground', self.mnt])):
+ argv + ['--foreground',
+ '--unmount-timeout=0.1',
+ self.mnt])):
return func(self, *args, **kwargs)
return wrapper
return decorator
run_test_server.authorize_with("admin")
self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+ # This is a copy of Mount's method. TODO: Refactor MountTestBase
+ # to use a Mount instead of copying its code.
+ def _llfuse_main(self):
+ try:
+ llfuse.main()
+ except:
+ llfuse.close(unmount=False)
+ raise
+ llfuse.close()
+
def make_mount(self, root_class, **root_kwargs):
self.operations = fuse.Operations(
os.getuid(), os.getgid(),
self.operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
- threading.Thread(None, llfuse.main).start()
+ self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+ self.llfuse_thread.daemon = True
+ self.llfuse_thread.start()
# wait until the driver is finished initializing
self.operations.initlock.wait()
return self.operations.inodes[llfuse.ROOT_INODE]
self.pool.join()
del self.pool
- # llfuse.close is buggy, so use fusermount instead.
- #llfuse.close(unmount=True)
-
- count = 0
- success = 1
- while (count < 9 and success != 0):
- success = subprocess.call(["fusermount", "-u", self.mounttmp])
- time.sleep(0.1)
- count += 1
-
- self.operations.destroy()
+ subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+ self.llfuse_thread.join(timeout=1)
+ if self.llfuse_thread.is_alive():
+ logger.warning("MountTestBase.tearDown():"
+ " llfuse thread still alive 1s after umount"
+ " -- abandoning and exiting anyway")
os.rmdir(self.mounttmp)
if self.keeptmp:
import json
import llfuse
import logging
+import mock
import os
import run_test_server
import sys
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.MagicDirectory)
self.assertEqual(e.pdh_only, False)
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
def test_by_pdh(self):
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.MagicDirectory)
self.assertEqual(e.pdh_only, True)
+ self.assertEqual(False, self.mnt.listen_for_events)
@noexit
def test_by_tag(self):
self.assertEqual(args.mode, 'by_tag')
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.TagsDirectory)
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
def test_collection(self, id_type='uuid'):
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.CollectionDirectory)
self.assertEqual(e.collection_locator, cid)
+ self.assertEqual(id_type == 'uuid', self.mnt.listen_for_events)
def test_collection_pdh(self):
self.test_collection('portable_data_hash')
e = self.check_ent_type(arvados_fuse.ProjectDirectory)
self.assertEqual(e.project_object['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
def test_mutually_exclusive_args(self):
cid = run_test_server.fixture('collections')['public_text_file']['uuid']
e = self.check_ent_type(arvados_fuse.SharedDirectory)
self.assertEqual(e.current_user['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
- def test_custom(self):
+ @mock.patch('arvados.events.subscribe')
+ def test_custom(self, mock_subscribe):
args = arvados_fuse.command.ArgumentParser().parse_args([
'--mount-tmp', 'foo',
'--mount-tmp', 'bar',
e = self.check_ent_type(arvados_fuse.ProjectDirectory, 'my_home')
self.assertEqual(e.project_object['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
+ with self.mnt:
+ pass
+ self.assertEqual(1, mock_subscribe.call_count)
+
+ @noexit
+ @mock.patch('arvados.events.subscribe')
+ def test_custom_no_listen(self, mock_subscribe):
+ args = arvados_fuse.command.ArgumentParser().parse_args([
+ '--mount-by-pdh', 'pdh',
+ '--mount-tmp', 'foo',
+ '--mount-tmp', 'bar',
+ '--foreground', self.mntdir])
+ self.mnt = arvados_fuse.command.Mount(args)
+ self.assertEqual(False, self.mnt.listen_for_events)
+ with self.mnt:
+ pass
+ self.assertEqual(0, mock_subscribe.call_count)
def test_custom_unsupported_layouts(self):
for name in ['.', '..', '', 'foo/bar', '/foo']:
--- /dev/null
+import arvados_fuse.command
+import json
+import multiprocessing
+import os
+import run_test_server
+import tempfile
+import unittest
+
+try:
+ from shlex import quote
+except:
+ from pipes import quote
+
+def try_exec(mnt, cmd):
+ try:
+ arvados_fuse.command.Mount(
+ arvados_fuse.command.ArgumentParser().parse_args([
+ '--read-write',
+ '--mount-tmp=zzz',
+ '--unmount-timeout=0.1',
+ mnt,
+ '--exec'] + cmd)).run()
+ except SystemExit:
+ pass
+ else:
+ raise AssertionError('should have exited')
+
+
+class ExecMode(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ run_test_server.run()
+ run_test_server.run_keep(enforce_permissions=True, num_servers=2)
+ run_test_server.authorize_with('active')
+
+ @classmethod
+ def tearDownClass(cls):
+ run_test_server.stop_keep(num_servers=2)
+
+ def setUp(self):
+ self.mnt = tempfile.mkdtemp()
+ _, self.okfile = tempfile.mkstemp()
+ self.pool = multiprocessing.Pool(1)
+
+ def tearDown(self):
+ self.pool.terminate()
+ self.pool.join()
+ os.rmdir(self.mnt)
+ os.unlink(self.okfile)
+
+ def test_exec(self):
+ self.pool.apply(try_exec, (self.mnt, [
+ 'sh', '-c',
+ 'echo -n foo >{}; cp {} {}'.format(
+ quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+ quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+ quote(os.path.join(self.okfile)))]))
+ self.assertRegexpMatches(
+ json.load(open(self.okfile))['manifest_text'],
+ r' 0:3:foo.txt\n')
}
func (s *azureVolumeAdder) Set(containerName string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
+
if containerName == "" {
return errors.New("no container name given")
}
}
}
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
+
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
+
// Ideally we would use If-Unmodified-Since, but that
// particular condition seems to be ignored by Azure. Instead,
// we get the Etag before checking Mtime, and use If-Match to
})
}
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
}
}
+
+func TestUntrashHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
+ vols[0].Put(TestHash, TestBlock)
+
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // unauthenticatedReq => UnauthorizedError
+ unauthenticatedReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ }
+ response := IssueRequest(unauthenticatedReq)
+ ExpectStatusCode(t,
+ "Unauthenticated request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // notDataManagerReq => UnauthorizedError
+ notDataManagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: knownToken,
+ }
+
+ response = IssueRequest(notDataManagerReq)
+ ExpectStatusCode(t,
+ "Non-datamanager token",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // datamanagerWithBadHashReq => StatusBadRequest
+ datamanagerWithBadHashReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/thisisnotalocator",
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWithBadHashReq)
+ ExpectStatusCode(t,
+ "Bad locator in untrash request",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerWrongMethodReq => StatusBadRequest
+ datamanagerWrongMethodReq := &RequestTester{
+ method: "GET",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWrongMethodReq)
+ ExpectStatusCode(t,
+ "Only PUT method is supported for untrash",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "",
+ http.StatusOK,
+ response)
+ expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+ if response.Body.String() != expected {
+ t.Errorf(
+ "Untrash response mismatched: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+ defer teardown()
+
+ // Set up readonly Keep volumes
+ vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+ vols[0].Readonly = true
+ vols[1].Readonly = true
+ KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+ defer KeepVM.Close()
+
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response := IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "No writable volumes",
+ http.StatusNotFound,
+ response)
+}
"regexp"
"runtime"
"strconv"
+ "strings"
"sync"
"time"
)
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Untrash moves blocks from trash back into store
+ rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
trashq.ReplaceQueue(tlist)
}
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, "No writable volumes", http.StatusNotFound)
+ return
+ }
+
+ var untrashedOn, failedOn []string
+ var numNotFound int
+ for _, vol := range KeepVM.AllWritable() {
+ err := vol.Untrash(hash)
+
+ if os.IsNotExist(err) {
+ numNotFound++
+ } else if err != nil {
+ log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ failedOn = append(failedOn, vol.String())
+ } else {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ untrashedOn = append(untrashedOn, vol.String())
+ }
+ }
+
+ if numNotFound == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+ return
+ }
+
+ if len(failedOn) == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+ } else {
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ if len(failedOn) > 0 {
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ }
+ resp.Write([]byte(respBody))
+ }
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
// actually deleting anything.
var neverDelete = true
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
var maxBuffers = 128
var bufs *bufferPool
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
)
func (e *KeepError) Error() string {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.DurationVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0*time.Second,
+ "Interval after a block is trashed during which it can be recovered using an /untrash request")
flag.Parse()
}
func (s *s3VolumeAdder) Set(bucketName string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if bucketName == "" {
return fmt.Errorf("no container name given")
}
return nil
}
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < blobSignatureTTL {
return v.Bucket.Del(loc)
}
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
if neverDelete {
err = errors.New("did not delete block because neverDelete is true")
} else {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Trash(trashRequest.Locator)
}
if err != nil {
// particular order.
IndexTo(prefix string, writer io.Writer) error
- // Delete deletes the block data from the underlying storage
- // device.
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Delete must not delete the data.
+ // blobSignatureTTL, Trash must not trash the data.
//
- // If a Delete operation overlaps with any Touch or Put
+ // If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// ensure one of the following outcomes:
//
// - Touch and Put return a non-nil error, or
- // - Delete does not delete the block, or
+ // - Trash does not trash the block, or
// - Both of the above.
//
// If it is possible for the storage device to be accessed by
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be deleted for at least blobSignatureTTL
+ // will not be trashed for at least blobSignatureTTL
// seconds.
- Delete(loc string) error
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
// Status returns a *VolumeStatus representing the current
// in-use and available storage capacity and an
testPutConcurrent(t, factory)
testPutFullBlock(t, factory)
+
+ testTrashUntrash(t, factory)
}
// Put a test block, get it and verify content
v.Put(TestHash, TestBlock)
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data, err := v.Get(TestHash)
v.Put(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
v := factory(t)
defer v.Teardown()
- if err := v.Delete(TestHash2); err == nil {
+ if err := v.Trash(TestHash2); err == nil {
t.Errorf("Expected error when attempting to delete a non-existing block")
}
}
}
// Delete a block from a read-only volume should result in error
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err == nil {
t.Errorf("Expected error when deleting block from a read-only volume")
}
t.Error("rdata != wdata")
}
}
+
+// With trashLifetime != 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - which either raises ErrNotImplemented or succeeds
+// Get - which must succeed
+func testTrashUntrash(t TB, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+ defer func() {
+ trashLifetime = 0
+ }()
+
+ trashLifetime = 3600 * time.Second
+
+ // put block and backdate it
+ v.PutRaw(TestHash, TestBlock)
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+
+ // Trash
+ err = v.Trash(TestHash)
+ if v.Writable() == false {
+ if err != MethodDisabledError {
+ t.Error(err)
+ }
+ } else if err != nil {
+ if err != ErrNotImplemented {
+ t.Error(err)
+ }
+ } else {
+ _, err = v.Get(TestHash)
+ if err == nil || !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
+ }
+
+ // Untrash
+ err = v.Untrash(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Get the block - after trash and untrash sequence
+ buf, err = v.Get(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+}
return nil
}
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
v.gotCall("Delete")
<-v.Gate
if v.Readonly {
return os.ErrNotExist
}
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+ return nil
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
}
func (vs *unixVolumeAdder) Set(value string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if dirs := strings.Split(value, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
}
// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Delete() because either (a) the file will be deleted and Touch()
if v.readonly {
return MethodDisabledError
}
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
return os.Remove(p)
}
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
logins.each do |l|
next if seen[l[:username]]
seen[l[:username]] = true if not seen.has_key?(l[:username])
- @homedir = "/home/#{l[:username]}"
unless uids[l[:username]]
STDERR.puts "Creating account #{l[:username]}"
out: devnull)
end
# Create .ssh directory if necessary
+ @homedir = Etc.getpwnam(l[:username]).dir
userdotssh = File.join(@homedir, ".ssh")
Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
@key = "#######################################################################################
puts bang.backtrace.join("\n")
exit 1
end
-
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._logger = logging.getLogger(self.LOGGER_NAME)
self._later = self.actor_ref.proxy()
self._polling_started = False
- self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
def _start_polling(self):
if not self._polling_started:
self._polling_started = True
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
- self._logger.debug("%r subscribed to all events", subscriber)
+ self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
- self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
- self._logger.debug("%s got response with %d items",
- self.log_prefix, len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- return "{} got error: {} - waiting {} seconds".format(
- self.log_prefix, error, self.poll_wait)
+ return "got error: {} - will try again in {} seconds".format(
+ error, self.poll_wait)
def is_common_error(self, exception):
return False
def poll(self, scheduled_start=None):
- self._logger.debug("%s sending poll", self.log_prefix)
+ self._logger.debug("sending request")
start_time = time.time()
if scheduled_start is None:
scheduled_start = start_time
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
+ self._logger.info("got response with %d items in %s seconds, next poll at %s",
+ len(response), (time.time() - scheduled_start),
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait
from __future__ import absolute_import, print_function
import calendar
+import functools
import itertools
import re
import time
else:
return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
+class RetryMixin(object):
+ """Retry decorator for an method that makes remote requests.
+
+ Use this function to decorate method, and pass in a tuple of exceptions to
+ catch. If the original method raises a known cloud driver error, or any of
+ the given exception types, this decorator will either go into a
+ sleep-and-retry loop with exponential backoff either by sleeping (if
+ self._timer is None) or by scheduling retries of the method (if self._timer
+ is a timer actor.)
+
+ """
+ def __init__(self, retry_wait, max_retry_wait,
+ logger, cloud, timer=None):
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self._logger = logger
+ self._cloud = cloud
+ self._timer = timer
+
+ @staticmethod
+ def _retry(errors=()):
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def retry_wrapper(self, *args, **kwargs):
+ while True:
+ try:
+ ret = orig_func(self, *args, **kwargs)
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ self.retry_wait = self.min_retry_wait
+ self._logger.warning(
+ "Re-raising unknown error (no retry): %s",
+ error, exc_info=error)
+ raise
+
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait, exc_info=error)
+
+ if self._timer:
+ start_time = time.time()
+ # reschedule to be called again
+ self._timer.schedule(start_time + self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ else:
+ # sleep on it.
+ time.sleep(self.retry_wait)
+
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ if self._timer:
+ # expect to be called again by timer so don't loop
+ return
+ else:
+ self.retry_wait = self.min_retry_wait
+ return ret
+ return retry_wrapper
+ return decorator
+
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.
import pykka
from .. import \
- arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
+ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
+ arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
-class ComputeNodeStateChangeBase(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
"""Base class for actors that change a compute node's state.
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+ def __init__(self, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
+ RetryMixin.__init__(self, retry_wait, max_retry_wait,
+ None, cloud_client, timer_actor)
self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger(logger_name)
- self._cloud = cloud_client
self._arvados = arvados_client
- self._timer = timer_actor
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self.subscribers = set()
- @staticmethod
- def _retry(errors=()):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises a known cloud driver error, or any of the
- given exception types.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def retry_wrapper(self, *args, **kwargs):
- start_time = time.time()
- try:
- orig_func(self, *args, **kwargs)
- except Exception as error:
- if not (isinstance(error, errors) or
- self._cloud.is_cloud_exception(error)):
- raise
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(start_time + self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return retry_wrapper
- return decorator
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+ def on_start(self):
+ self._set_logger()
def _finished(self):
_notify_subscribers(self._later, self.subscribers)
self.subscribers = None
+ self._logger.info("finished")
def subscribe(self, subscriber):
if self.subscribers is None:
'last_action': explanation}},
).execute()
+ @staticmethod
+ def _finish_on_exception(orig_func):
+ @functools.wraps(orig_func)
+ def finish_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ self._logger.error("Actor error %s", error)
+ self._finished()
+ return finish_wrapper
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self.cloud_size = cloud_size
self.arvados_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._clean_arvados_node(
node, "Prepared by Node Manager")
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry()
def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
+ self._logger.info("Sending create_node request for node size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def update_arvados_node_properties(self):
"""Tell Arvados some details about the cloud node.
self._logger.info("%s updated properties.", self.arvados_node['uuid'])
self._later.post_create()
- @ComputeNodeStateChangeBase._retry()
+ @RetryMixin._retry()
def post_create(self):
self._cloud.post_create_node(self.cloud_node)
self._logger.info("%s post-create work done.", self.cloud_node.id)
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
self.cancel_reason = None
self.success = None
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
def on_start(self):
+ super(ComputeNodeShutdownActor, self).on_start()
self._later.shutdown_node()
def _arvados_node(self):
def cancel_shutdown(self, reason):
self.cancel_reason = reason
- self._logger.info("Cloud node %s shutdown cancelled: %s.",
- self.cloud_node.id, reason)
+ self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
- (not self._monitor.shutdown_eligible().get())):
+ (self._monitor.shutdown_eligible().get() is not True)):
self._later.cancel_shutdown(self.WINDOW_CLOSED)
return None
else:
return orig_func(self, *args, **kwargs)
return stop_wrapper
+ @ComputeNodeStateChangeBase._finish_on_exception
@_stop_if_window_closed
- @ComputeNodeStateChangeBase._retry()
+ @RetryMixin._retry()
def shutdown_node(self):
+ self._logger.info("Starting shutdown")
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._logger.info("Shutdown success")
arv_node = self._arvados_node()
if arv_node is None:
self._finished(success_flag=True)
else:
self._later.clean_arvados_node(arv_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def clean_arvados_node(self, arvados_node):
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
self._finished(success_flag=True)
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
self.last_shutdown_opening = None
self._later.consider_shutdown()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+ def on_start(self):
+ self._set_logger()
+ self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
def subscribe(self, subscriber):
self.subscribers.add(subscriber)
return result
def shutdown_eligible(self):
+ """Return True if eligible for shutdown, or a string explaining why the node
+ is not eligible for shutdown."""
+
if not self._shutdowns.window_open():
- return False
+ return "shutdown window is not open."
if self.arvados_node is None:
# Node is unpaired.
# If it hasn't pinged Arvados after boot_fail seconds, shut it down
- return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+ if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+ return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+ else:
+ return True
missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
if missing and self._cloud.broken(self.cloud_node):
# Node is paired, but Arvados says it is missing and the cloud says the node
# is in an error state, so shut it down.
return True
if missing is None and self._cloud.broken(self.cloud_node):
- self._logger.warning(
- "cloud reports broken node, but paired node %s never pinged "
- "(bug?) -- skipped check for node_stale_after",
+ self._logger.info(
+ "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+ "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
self.arvados_node['uuid'])
- return self.in_state('idle')
+ if self.in_state('idle'):
+ return True
+ else:
+ return "node is not idle."
def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self.shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- elif self._shutdowns.window_open():
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- elif self.last_shutdown_opening != next_opening:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
+ try:
+ next_opening = self._shutdowns.next_opening()
+ eligible = self.shutdown_eligible()
+ if eligible is True:
+ self._debug("Suggesting shutdown.")
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Cannot shut down because %s", eligible)
+ elif self.last_shutdown_opening != next_opening:
+ self._debug("Shutdown window closed. Next at %s.",
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+ except Exception:
+ self._logger.exception("Unexpected exception")
def offer_arvados_pair(self, arvados_node):
first_ping_s = arvados_node.get('first_ping_at')
from . import \
ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
+from .. import RetryMixin
class ComputeNodeShutdownActor(ShutdownActorBase):
SLURM_END_STATES = frozenset(['down\n', 'down*\n',
self._nodename = None
return super(ComputeNodeShutdownActor, self).on_start()
else:
+ self._set_logger()
self._nodename = arv_node['hostname']
self._logger.info("Draining SLURM node %s", self._nodename)
self._later.issue_slurm_drain()
# of the excessive memory usage that result in the "Cannot allocate memory"
# error are still being investigated.
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self, reason):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
output = self._get_slurm_state()
from __future__ import absolute_import, print_function
+import logging
from operator import attrgetter
import libcloud.common.types as cloud_types
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import NETWORK_ERRORS
+from .. import RetryMixin
-class BaseComputeNodeDriver(object):
+class BaseComputeNodeDriver(RetryMixin):
"""Abstract base class for compute node drivers.
libcloud drivers abstract away many of the differences between
"""
CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ @RetryMixin._retry()
+ def _create_driver(self, driver_class, **auth_kwargs):
+ return driver_class(**auth_kwargs)
+
+ @RetryMixin._retry()
+ def _set_sizes(self):
+ self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class, retry_wait=1, max_retry_wait=180):
"""Base initializer for compute node drivers.
Arguments:
libcloud driver's create_node method to create a new compute node.
* driver_class: The class of a libcloud driver to use.
"""
- self.real = driver_class(**auth_kwargs)
+
+ super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
+ logging.getLogger(self.__class__.__name__),
+ type(self),
+ None)
+ self.real = self._create_driver(driver_class, **auth_kwargs)
self.list_kwargs = list_kwargs
self.create_kwargs = create_kwargs
# Transform entries in create_kwargs. For each key K, if this class
if new_pair is not None:
self.create_kwargs[new_pair[0]] = new_pair[1]
- self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+ self._set_sizes()
def _init_ping_host(self, ping_host):
self.ping_host = ping_host
self.ping_host, arvados_node['uuid'],
arvados_node['info']['ping_secret'])
+ def find_node(self, name):
+ node = [n for n in self.list_nodes() if n.name == name]
+ if node:
+ return node[0]
+ else:
+ return None
+
def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(size, arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
+ try:
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(size, arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+ except self.CLOUD_ERRORS:
+ # Workaround for bug #6702: sometimes the create node request
+ # succeeds but times out and raises an exception instead of
+ # returning a result. If this happens, we get stuck in a retry
+ # loop forever because subsequent create_node attempts will fail
+ # due to node name collision. So check if the node we intended to
+ # create shows up in the cloud node list and return it if found.
+ try:
+ node = self.find_node(kwargs['name'])
+ if node:
+ return node
+ except:
+ # Ignore possible exception from find_node in favor of
+ # re-raising the original create_node exception.
+ pass
+ raise
def post_create_node(self, cloud_node):
# ComputeNodeSetupActor calls this method after the cloud node is
self.real.ex_create_tags(cloud_node,
{'Name': arvados_node_fqdn(arvados_node)})
+ def find_node(self, name):
+ raise NotImplementedError("ec2.ComputeNodeDriver.find_node")
+
def list_nodes(self):
# Need to populate Node.size
nodes = super(ComputeNodeDriver, self).list_nodes()
})
return result
+
def list_nodes(self):
# The GCE libcloud driver only supports filtering node lists by zone.
# Do our own filtering based on tag list.
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._logger = logging.getLogger('arvnodeman.daemon')
self._later = self.actor_ref.proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
- self._logger.debug("Daemon initialized")
+ self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+ self._logger.debug("Daemon started")
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
def _pair_nodes(self, node_record, arvados_node):
- self._logger.info("Cloud node %s has associated with Arvados node %s",
- node_record.cloud_node.id, arvados_node['uuid'])
+ self._logger.info("Cloud node %s is now paired with Arvados node %s",
+ node_record.cloud_node.name, arvados_node['uuid'])
self._arvados_nodes_actor.subscribe_to(
arvados_node['uuid'], node_record.actor.update_arvados_node)
node_record.arvados_node = arvados_node
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
+ del self.sizes_booting_shutdown[key]
record.actor.stop()
record.cloud_node = None
self._pair_nodes(cloud_rec, arv_node)
break
- def _nodes_up(self, size):
- up = 0
- up += sum(1
- for c in self.booting.itervalues()
- if size is None or c.cloud_size.get().id == size.id)
- up += sum(1
- for i in (self.booted, self.cloud_nodes.nodes)
- for c in i.itervalues()
+ def _nodes_booting(self, size):
+ s = sum(1
+ for c in self.booting.iterkeys()
+ if size is None or self.sizes_booting_shutdown[c].id == size.id)
+ s += sum(1
+ for c in self.booted.itervalues()
+ if size is None or c.cloud_node.size.id == size.id)
+ return s
+
+ def _nodes_unpaired(self, size):
+ return sum(1
+ for c in self.cloud_nodes.unpaired()
+ if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_booted(self, size):
+ return sum(1
+ for c in self.cloud_nodes.nodes.itervalues()
if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_up(self, size):
+ up = self._nodes_booting(size) + self._nodes_booted(size)
return up
def _total_price(self):
cost = 0
- cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
- for c in self.booting.itervalues())
+ cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+ for c in self.booting.iterkeys())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
for i in (self.booted, self.cloud_nodes.nodes)
for c in i.itervalues())
def _size_shutdowns(self, size):
sh = 0
- for c in self.shutdowns.itervalues():
+ for c in self.shutdowns.iterkeys():
try:
- if c.cloud_node.get().size.id == size.id:
+ if self.sizes_booting_shutdown[c].id == size.id:
sh += 1
except pykka.ActorDeadError:
pass
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
- self._nodes_busy(size) +
- self._nodes_missing(size))
+ booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
+ shutdown_count = self._size_shutdowns(size)
+ busy_count = self._nodes_busy(size)
+ up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
- self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
+ self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+ self._size_wishlist(size),
+ up_count + busy_count,
+ booting_count,
+ up_count - booting_count,
+ busy_count,
+ shutdown_count)
wanted = self._size_wishlist(size) - up_count
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
self._update_poll_time('server_wishlist')
self.last_wishlist = wishlist
for size in reversed(self.server_calculator.cloud_sizes):
- nodes_wanted = self._nodes_wanted(size)
- if nodes_wanted > 0:
- self._later.start_node(size)
- elif (nodes_wanted < 0) and self.booting:
- self._later.stop_booting_node(size)
+ try:
+ nodes_wanted = self._nodes_wanted(size)
+ if nodes_wanted > 0:
+ self._later.start_node(size)
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node(size)
+ except Exception as e:
+ self._logger.exception("while calculating nodes wanted for size %s", size)
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- self._logger.info("Want %s more nodes. Booting a %s node.",
+ self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
timer_actor=self._timer,
cloud_client=self._new_cloud(),
cloud_size=cloud_size).proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
def node_up(self, setup_proxy):
cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
+ del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
setup_proxy.stop()
- record = self.cloud_nodes.get(cloud_node.id)
- if record is None:
- record = self._new_node(cloud_node)
- self.booted[cloud_node.id] = record
- self._timer.schedule(time.time() + self.boot_fail_after,
- self._later.shutdown_unpaired_node, cloud_node.id)
+ if cloud_node is not None:
+ record = self.cloud_nodes.get(cloud_node.id)
+ if record is None:
+ record = self._new_node(cloud_node)
+ self.booted[cloud_node.id] = record
+ self._timer.schedule(time.time() + self.boot_fail_after,
+ self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
def stop_booting_node(self, size):
for key, node in self.booting.iteritems():
if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
+ del self.sizes_booting_shutdown[key]
+
if nodes_excess > 1:
self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
- cloud_node_id = node_actor.cloud_node.get().id
+ cloud_node_obj = node_actor.cloud_node.get()
+ cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
+ self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
shutdown.subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
"""
CLIENT_ERRORS = ARVADOS_ERRORS
- LOGGER_NAME = 'arvnodeman.jobqueue'
def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
super(JobQueueMonitorActor, self).__init__(
def _got_response(self, queue):
server_list = self._calculator.servers_for_queue(queue)
- self._logger.debug("Sending server wishlist: %s",
+ self._logger.debug("Calculated wishlist: %s",
', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
- setup_logging(config.get('Logging', 'file'), **config.log_levels())
- node_setup, node_shutdown, node_update, node_monitor = \
- config.dispatch_classes()
- server_calculator = build_server_calculator(config)
- timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
- launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
- node_daemon = NodeManagerDaemonActor.start(
- job_queue_poller, arvados_node_poller, cloud_node_poller,
- cloud_node_updater, timer,
- config.new_arvados_client, config.new_cloud_client,
- config.shutdown_windows(),
- server_calculator,
- config.getint('Daemon', 'min_nodes'),
- config.getint('Daemon', 'max_nodes'),
- config.getint('Daemon', 'poll_stale_after'),
- config.getint('Daemon', 'boot_fail_after'),
- config.getint('Daemon', 'node_stale_after'),
- node_setup, node_shutdown, node_monitor,
- max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
-
- signal.pause()
- daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
- while not daemon_stopped():
- time.sleep(1)
- pykka.ActorRegistry.stop_all()
+ try:
+ setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ node_setup, node_shutdown, node_update, node_monitor = \
+ config.dispatch_classes()
+ server_calculator = build_server_calculator(config)
+ timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+ launch_pollers(config, server_calculator)
+ cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+ node_daemon = NodeManagerDaemonActor.start(
+ job_queue_poller, arvados_node_poller, cloud_node_poller,
+ cloud_node_updater, timer,
+ config.new_arvados_client, config.new_cloud_client,
+ config.shutdown_windows(),
+ server_calculator,
+ config.getint('Daemon', 'min_nodes'),
+ config.getint('Daemon', 'max_nodes'),
+ config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'boot_fail_after'),
+ config.getint('Daemon', 'node_stale_after'),
+ node_setup, node_shutdown, node_monitor,
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+
+ signal.pause()
+ daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+ while not daemon_stopped():
+ time.sleep(1)
+ except Exception:
+ logging.exception("Uncaught exception during setup")
+ finally:
+ pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
This actor regularly polls the list of Arvados node records, and
sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.arvados_nodes'
def is_common_error(self, exception):
return isinstance(exception, config.ARVADOS_ERRORS)
This actor regularly polls the cloud to get a list of running compute
nodes, and sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.cloud_nodes'
def is_common_error(self, exception):
return self._client.is_cloud_exception(exception)
return node.id
def _send_request(self):
- return self._client.list_nodes()
+ n = self._client.list_nodes()
+ return n
def test_late_subscribers_get_responses(self):
self.build_monitor(['pre_late_test', 'late_test'])
- self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
+ mock_subscriber = mock.Mock(name='mock_subscriber')
+ self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
self.monitor.subscribe(self.subscriber)
self.monitor.poll().get(self.TIMEOUT)
self.stop_proxy(self.monitor)
if __name__ == '__main__':
unittest.main()
-
import httplib2
import mock
import pykka
+import threading
import arvnodeman.computenode.dispatch as dispatch
from . import testutil
def test_creation_without_arvados_node(self):
self.make_actor()
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
def test_creation_with_arvados_node(self):
self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
self.make_actor(testutil.arvados_node_mock())
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
def test_no_shutdown_booting(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
last_ping_at='1970-01-01T01:02:03.04050607Z')
self.make_actor(10, arv_node)
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_running_broken(self):
arv_node = testutil.arvados_node_mock(12, job_uuid=None,
self.make_actor(12, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_shutdown_missing_broken(self):
arv_node = testutil.arvados_node_mock(11, job_uuid=None,
def test_no_shutdown_when_window_closed(self):
self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(
5, crunch_worker_state=None))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_arvados_node_match(self):
self.make_actor(2)
echo z1.test > /var/tmp/arv-node-data/meta-data/instance-type
""",
driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
+
+ def test_create_raises_but_actually_succeeded(self):
+ arv_node = testutil.arvados_node_mock(1, hostname=None)
+ driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+ nodelist = [testutil.cloud_node_mock(1, tags={"arvados-class": "dynamic-compute"})]
+ nodelist[0].name = 'compute-000000000000001-zzzzz'
+ self.driver_mock().list_nodes.return_value = nodelist
+ self.driver_mock().create_node.side_effect = IOError
+ n = driver.create_node(testutil.MockSize(1), arv_node)
+ self.assertEqual('compute-000000000000001-zzzzz', n.name)
metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
+ def test_create_raises_but_actually_succeeded(self):
+ arv_node = testutil.arvados_node_mock(1, hostname=None)
+ driver = self.new_driver()
+ nodelist = [testutil.cloud_node_mock(1)]
+ nodelist[0].name = 'compute-000000000000001-zzzzz'
+ self.driver_mock().list_nodes.return_value = nodelist
+ self.driver_mock().create_node.side_effect = IOError
+ n = driver.create_node(testutil.MockSize(1), arv_node)
+ self.assertEqual('compute-000000000000001-zzzzz', n.name)
+
def test_create_sets_default_hostname(self):
driver = self.new_driver()
driver.create_node(testutil.MockSize(1),
mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+ self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
def driver_method_args(self, method_name):
return getattr(self.driver_mock(), method_name).call_args
+ def test_driver_create_retry(self):
+ with mock.patch('time.sleep'):
+ driver_mock2 = mock.MagicMock(name='driver_mock2')
+ self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
+ kwargs = {'user_id': 'foo'}
+ driver = self.new_driver(auth_kwargs=kwargs)
+ self.assertTrue(self.driver_mock.called)
+ self.assertIs(driver.real, driver_mock2)
class RemotePollLoopActorTestMixin(ActorTestMixin):
def build_monitor(self, *args, **kwargs):
include agpl-3.0.txt
+include crunchstat_summary/chartjs.js
import crunchstat_summary.command
import crunchstat_summary.summarizer
+import logging
import sys
+logging.getLogger().addHandler(logging.StreamHandler())
+
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
-s.run()
-print(s.report(), end='')
+cmd = crunchstat_summary.command.Command(args)
+cmd.run()
+print(cmd.report(), end='')
+import logging
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
--- /dev/null
+window.onload = function() {
+ var charts = {};
+ sections.forEach(function(section, section_idx) {
+ var h1 = document.createElement('h1');
+ h1.appendChild(document.createTextNode(section.label));
+ document.body.appendChild(h1);
+ section.charts.forEach(function(data, chart_idx) {
+ // Skip chart if every series has zero data points
+ if (0 == data.data.reduce(function(len, series) {
+ return len + series.dataPoints.length;
+ }, 0)) {
+ return;
+ }
+ var id = 'chart-'+section_idx+'-'+chart_idx;
+ var div = document.createElement('div');
+ div.setAttribute('id', id);
+ div.setAttribute('style', 'width: 100%; height: 150px');
+ document.body.appendChild(div);
+ charts[id] = new CanvasJS.Chart(id, data);
+ charts[id].render();
+ });
+ });
+
+ if (typeof window.debug === 'undefined')
+ window.debug = {};
+ window.debug.charts = charts;
+};
--- /dev/null
+from __future__ import print_function
+
+import cgi
+import json
+import math
+import pkg_resources
+
+from crunchstat_summary import logger
+
+
+class ChartJS(object):
+ JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js'
+
+ def __init__(self, label, summarizers):
+ self.label = label
+ self.summarizers = summarizers
+
+ def html(self):
+ return '''<!doctype html><html><head>
+ <title>{} stats</title>
+ <script type="text/javascript" src="{}"></script>
+ <script type="text/javascript">{}</script>
+ </head><body></body></html>
+ '''.format(cgi.escape(self.label), self.JSLIB, self.js())
+
+ def js(self):
+ return 'var sections = {};\n{}'.format(
+ json.dumps(self.sections()),
+ pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
+
+ def sections(self):
+ return [
+ {
+ 'label': s.long_label(),
+ 'charts': self.charts(s.label, s.tasks),
+ }
+ for s in self.summarizers]
+
+ def _axisY(self, tasks, stat):
+ ymax = 1
+ for task in tasks.itervalues():
+ for pt in task.series[stat]:
+ ymax = max(ymax, pt[1])
+ ytick = math.exp((1+math.floor(math.log(ymax, 2)))*math.log(2))/4
+ return {
+ 'gridColor': '#cccccc',
+ 'gridThickness': 1,
+ 'interval': ytick,
+ 'minimum': 0,
+ 'maximum': ymax,
+ 'valueFormatString': "''",
+ }
+
+ def charts(self, label, tasks):
+ return [
+ {
+ 'axisY': self._axisY(tasks=tasks, stat=stat),
+ 'data': [
+ {
+ 'type': 'line',
+ 'markerType': 'none',
+ 'dataPoints': self._datapoints(
+ label=uuid, task=task, series=task.series[stat]),
+ }
+ for uuid, task in tasks.iteritems()
+ ],
+ 'title': {
+ 'text': '{}: {} {}'.format(label, stat[0], stat[1]),
+ },
+ 'zoomEnabled': True,
+ }
+ for stat in (('cpu', 'user+sys__rate'),
+ ('mem', 'rss'),
+ ('net:eth0', 'tx+rx__rate'),
+ ('net:keep0', 'tx+rx__rate'))]
+
+ def _datapoints(self, label, task, series):
+ points = [
+ {'x': pt[0].total_seconds(), 'y': pt[1]}
+ for pt in series]
+ if len(points) > 0:
+ points[-1]['markerType'] = 'cross'
+ points[-1]['markerSize'] = 12
+ return points
import argparse
+import gzip
+import logging
+import sys
+
+from crunchstat_summary import logger, summarizer
class ArgumentParser(argparse.ArgumentParser):
src = self.add_mutually_exclusive_group()
src.add_argument(
'--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep')
+ help='Look up the specified job and read its log data from Keep'
+ ' (or from the Arvados event log, if the job is still running)')
+ src.add_argument(
+ '--pipeline-instance', type=str, metavar='UUID',
+ help='Summarize each component of the given pipeline instance')
src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
+ self.add_argument(
+ '--skip-child-jobs', action='store_true',
+ help='Do not include stats from child jobs')
+ self.add_argument(
+ '--format', type=str, choices=('html', 'text'), default='text',
+ help='Report format')
+ self.add_argument(
+ '--verbose', '-v', action='count', default=0,
+ help='Log more information (once for progress, twice for debug)')
+
+
+class Command(object):
+ def __init__(self, args):
+ self.args = args
+ logger.setLevel(logging.WARNING - 10 * args.verbose)
+
+ def run(self):
+ kwargs = {
+ 'skip_child_jobs': self.args.skip_child_jobs,
+ }
+ if self.args.pipeline_instance:
+ self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+ elif self.args.job:
+ self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+ elif self.args.log_file:
+ if self.args.log_file.endswith('.gz'):
+ fh = gzip.open(self.args.log_file)
+ else:
+ fh = open(self.args.log_file)
+ self.summer = summarizer.Summarizer(fh, **kwargs)
+ else:
+ self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
+ return self.summer.run()
+
+ def report(self):
+ if self.args.format == 'html':
+ return self.summer.html_report()
+ elif self.args.format == 'text':
+ return self.summer.text_report()
--- /dev/null
+from __future__ import print_function
+
+import arvados
+import Queue
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+ self._label = "{}/{}".format(collection_id, filenames[0])
+
+ def __str__(self):
+ return self._label
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ EOF = None
+
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self.job_uuid = job_uuid
+
+ def __str__(self):
+ return self.job_uuid
+
+ def _get_all_pages(self):
+ got = 0
+ last_id = 0
+ filters = [
+ ['object_uuid', '=', self.job_uuid],
+ ['event_type', '=', 'stderr']]
+ try:
+ while True:
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=filters + [['id','>',str(last_id)]],
+ select=['id', 'properties'],
+ ).execute(num_retries=2)
+ got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self.job_uuid, got,
+ got + page['items_available'] - len(page['items']))
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._queue.put(line+'\n')
+ last_id = i['id']
+ if (len(page['items']) == 0 or
+ len(page['items']) >= page['items_available']):
+ break
+ finally:
+ self._queue.put(self.EOF)
+
+ def __iter__(self):
+ self._queue = Queue.Queue()
+ self._thread = threading.Thread(target=self._get_all_pages)
+ self._thread.daemon = True
+ self._thread.start()
+ return self
+
+ def next(self):
+ line = self._queue.get()
+ if line is self.EOF:
+ self._thread.join()
+ raise StopIteration
+ return line
import arvados
import collections
+import crunchstat_summary.chartjs
+import crunchstat_summary.reader
+import datetime
import functools
-import gzip
+import itertools
+import math
import re
import sys
+import threading
+
+from arvados.api import OrderedJsonModel
+from crunchstat_summary import logger
+
+# Recommend memory constraints that are this multiple of an integral
+# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
+# that have amounts like 7.5 GiB according to the kernel.)
+AVAILABLE_RAM_RATIO = 0.95
+
+
+class Task(object):
+ def __init__(self):
+ self.starttime = None
+ self.series = collections.defaultdict(list)
class Summarizer(object):
- def __init__(self, args):
- self.args = args
+ def __init__(self, logdata, label=None, skip_child_jobs=False):
+ self._logdata = logdata
+
+ self.label = label
+ self.starttime = None
+ self.finishtime = None
+ self._skip_child_jobs = skip_child_jobs
- def run(self):
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
- functools.partial(collections.defaultdict,
- lambda: float('-Inf')))
+ functools.partial(collections.defaultdict, lambda: 0))
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
- for line in self._logdata():
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
+
+ self.seq_to_uuid = {}
+ self.tasks = collections.defaultdict(Task)
+
+ # We won't bother recommending new runtime constraints if the
+ # constraints given when running the job are known to us and
+ # are already suitable. If applicable, the subclass
+ # constructor will overwrite this with something useful.
+ self.existing_constraints = {}
+
+ logger.debug("%s: logdata %s", self.label, logdata)
+
+ def run(self):
+ logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+ for line in self._logdata:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
if m:
- task_id = m.group('seq')
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
+
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+ if m:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
self.task_stats[task_id]['time'] = {'elapsed': elapsed}
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = JobSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
+
+ if self.label is None:
+ self.label = m.group('job_uuid')
+ logger.debug('%s: using job uuid as label', self.label)
if m.group('category').endswith(':'):
- # "notice:" etc.
+ # "stderr crunchstat: notice: ..."
+ continue
+ elif m.group('category') in ('error', 'caught'):
+ continue
+ elif m.group('category') == 'read':
+ # "stderr crunchstat: read /proc/1234/net/dev: ..."
+ # (crunchstat formatting fixed, but old logs still say this)
continue
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ task = self.tasks[task_id]
+
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
+ timestamp = datetime.datetime.strptime(
+ m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+ if not task.starttime:
+ task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
this_interval_s = None
for group in ['current', 'interval']:
if not m.group(group):
words = m.group(group).split(' ')
stats = {}
for val, stat in zip(words[::2], words[1::2]):
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
+ try:
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ raise ValueError(
+ 'Error parsing {} stat in "{}": {!r}'.format(
+ stat, line, e))
if 'user' in stats or 'sys' in stats:
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
this_interval_s = val
continue
elif not (this_interval_s > 0):
- print("BUG? interval stat given with duration {!r}".
- format(this_interval_s),
- file=sys.stderr)
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
continue
else:
stat = stat + '__rate'
val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
+ task.series[category, stat].append(
+ (timestamp - self.starttime, val))
else:
+ if stat in ['rss']:
+ task.series[category, stat].append(
+ (timestamp - self.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
+ logger.debug('%s: done parsing', self.label)
- def report(self):
- return "\n".join(self._report_gen()) + "\n"
-
- def _report_gen(self):
- job_tot = collections.defaultdict(
+ self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
for task_id, task_stat in self.task_stats.iteritems():
for category, stat_last in task_stat.iteritems():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
- job_tot[category][stat] += val
+ self.job_tot[category][stat] += val
+ logger.debug('%s: done totals', self.label)
+
+ def long_label(self):
+ label = self.label
+ if self.finishtime:
+ label += ' -- elapsed time '
+ s = (self.finishtime - self.starttime).total_seconds()
+ if s > 86400:
+ label += '{}d'.format(int(s/86400))
+ if s > 3600:
+ label += '{}h'.format(int(s/3600) % 24)
+ if s > 60:
+ label += '{}m'.format(int(s/60) % 60)
+ label += '{}s'.format(int(s) % 60)
+ return label
+
+ def text_report(self):
+ if not self.tasks:
+ return "(no report generated)\n"
+ return "\n".join(itertools.chain(
+ self._text_report_gen(),
+ self._recommend_gen())) + "\n"
+
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
+
+ def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
for category, stat_max in sorted(self.stats_max.iteritems()):
for stat, val in sorted(stat_max.iteritems()):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
val = self._format(val)
- tot = self._format(job_tot[category].get(stat, '-'))
+ tot = self._format(self.job_tot[category].get(stat, '-'))
yield "\t".join([category, stat, str(val), max_rate, tot])
for args in (
+ ('Number of tasks: {}',
+ len(self.tasks),
+ None),
('Max CPU time spent by a single task: {}s',
self.stats_max['cpu']['user+sys'],
None),
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+ self.job_tot['cpu']['user+sys'] /
+ self.job_tot['time']['elapsed']
+ if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
lambda x: x / 1e9),
('Max network traffic in a single task: {}GB',
- self.stats_max['net:eth0']['tx+rx'],
+ self.stats_max['net:eth0']['tx+rx'] +
+ self.stats_max['net:keep0']['tx+rx'],
lambda x: x / 1e9),
('Max network speed in a single interval: {}MB/s',
- self.stats_max['net:eth0']['tx+rx__rate'],
- lambda x: x / 1e6)):
+ self.stats_max['net:eth0']['tx+rx__rate'] +
+ self.stats_max['net:keep0']['tx+rx__rate'],
+ lambda x: x / 1e6),
+ ('Keep cache miss rate {}%',
+ (float(self.job_tot['keepcache']['miss']) /
+ float(self.job_tot['keepcalls']['get']))
+ if self.job_tot['keepcalls']['get'] > 0 else 0,
+ lambda x: x * 100.0),
+ ('Keep cache utilization {}%',
+ (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ if self.job_tot['net:keep0']['rx'] > 0 else 0,
+ lambda x: x * 100.0)):
format_string, val, transform = args
if val == float('-Inf'):
continue
val = transform(val)
yield "# "+format_string.format(self._format(val))
+ def _recommend_gen(self):
+ return itertools.chain(
+ self._recommend_cpu(),
+ self._recommend_ram(),
+ self._recommend_keep_cache())
+
+ def _recommend_cpu(self):
+ """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+ cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+ if cpu_max_rate == float('-Inf'):
+ logger.warning('%s: no CPU usage data', self.label)
+ return
+ used_cores = max(1, int(math.ceil(cpu_max_rate)))
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
+ if asked_cores is None or used_cores < asked_cores:
+ yield (
+ '#!! {} max CPU usage was {}% -- '
+ 'try runtime_constraints "min_cores_per_node":{}'
+ ).format(
+ self.label,
+ int(math.ceil(cpu_max_rate*100)),
+ int(used_cores))
+
+ def _recommend_ram(self):
+ """Recommend an economical RAM constraint for this job.
+
+ Nodes that are advertised as "8 gibibytes" actually have what
+ we might call "8 nearlygibs" of memory available for jobs.
+ Here, we calculate a whole number of nearlygibs that would
+ have sufficed to run the job, then recommend requesting a node
+ with that number of nearlygibs (expressed as mebibytes).
+
+ Requesting a node with "nearly 8 gibibytes" is our best hope
+ of getting a node that actually has nearly 8 gibibytes
+ available. If the node manager is smart enough to account for
+ the discrepancy itself when choosing/creating a node, we'll
+ get an 8 GiB node with nearly 8 GiB available. Otherwise, the
+ advertised size of the next-size-smaller node (say, 6 GiB)
+ will be too low to satisfy our request, so we will effectively
+ get rounded up to 8 GiB.
+
+ For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+ we will generally get a node that is advertised as "8 GiB" and
+ has at least 7500 MiB available. However, asking for 8192 MiB
+ would either result in an unnecessarily expensive 12 GiB node
+ (if node manager knows about the discrepancy), or an 8 GiB
+ node which has less than 8192 MiB available and is therefore
+ considered by crunch-dispatch to be too small to meet our
+ constraint.
+
+ When node manager learns how to predict the available memory
+ for each node type such that crunch-dispatch always agrees
+ that a node is big enough to run the job it was brought up
+ for, all this will be unnecessary. We'll just ask for exactly
+ the memory we want -- even if that happens to be 8192 MiB.
+ """
+
+ used_bytes = self.stats_max['mem']['rss']
+ if used_bytes == float('-Inf'):
+ logger.warning('%s: no memory usage data', self.label)
+ return
+ used_mib = math.ceil(float(used_bytes) / 1048576)
+ asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+ nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+ if asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ yield (
+ '#!! {} max RSS was {} MiB -- '
+ 'try runtime_constraints "min_ram_mb_per_node":{}'
+ ).format(
+ self.label,
+ int(used_mib),
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
+ def _recommend_keep_cache(self):
+ """Recommend increasing keep cache if utilization < 80%"""
+ if self.job_tot['net:keep0']['rx'] == 0:
+ return
+ utilization = (float(self.job_tot['blkio:0:0']['read']) /
+ float(self.job_tot['net:keep0']['rx']))
+ asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+
+ if utilization < 0.8:
+ yield (
+ '#!! {} Keep cache utilization was {:.2f}% -- '
+ 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+ ).format(
+ self.label,
+ utilization * 100.0,
+ asked_mib*2)
+
+
def _format(self, val):
"""Return a string representation of a stat.
else:
return '{}'.format(val)
- def _logdata(self):
- if self.args.log_file:
- if self.args.log_file.endswith('.gz'):
- return gzip.open(self.args.log_file)
- else:
- return open(self.args.log_file)
- elif self.args.job:
- arv = arvados.api('v1')
- job = arv.jobs().get(uuid=self.args.job).execute()
- if not job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.args.job))
- collection = arvados.collection.CollectionReader(job['log'])
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- job.log, len(filenames)))
- return collection.open(filenames[0])
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
+ super(CollectionSummarizer, self).__init__(
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
+ self.label = collection_id
+
+
+class JobSummarizer(Summarizer):
+ def __init__(self, job, **kwargs):
+ arv = arvados.api('v1')
+ if isinstance(job, basestring):
+ self.job = arv.jobs().get(uuid=job).execute()
else:
- return sys.stdin
+ self.job = job
+ rdr = None
+ if self.job.get('log'):
+ try:
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ except arvados.errors.NotFoundError as e:
+ logger.warning("Trying event logs after failing to read "
+ "log collection %s: %s", self.job['log'], e)
+ else:
+ label = self.job['uuid']
+ if rdr is None:
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
+ self.existing_constraints = self.job.get('runtime_constraints', {})
+
+
+class PipelineSummarizer(object):
+ def __init__(self, pipeline_instance_uuid, **kwargs):
+ arv = arvados.api('v1', model=OrderedJsonModel())
+ instance = arv.pipeline_instances().get(
+ uuid=pipeline_instance_uuid).execute()
+ self.summarizers = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
+ else:
+ logger.info(
+ "%s: job %s", cname, component['job']['uuid'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
+ self.summarizers[cname] = summarizer
+ self.label = pipeline_instance_uuid
+
+ def run(self):
+ threads = []
+ for summarizer in self.summarizers.itervalues():
+ t = threading.Thread(target=summarizer.run)
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+
+ def text_report(self):
+ txt = ''
+ for cname, summarizer in self.summarizers.iteritems():
+ txt += '### Summary for {} ({})\n'.format(
+ cname, summarizer.job['uuid'])
+ txt += summarizer.text_report()
+ txt += '\n'
+ return txt
+
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(
+ self.label, self.summarizers.itervalues()).html()
download_url="https://github.com/curoverse/arvados.git",
license='GNU Affero General Public License, version 3.0',
packages=['crunchstat_summary'],
+ include_package_data=True,
scripts=[
'bin/crunchstat-summary'
],
'arvados-python-client',
],
test_suite='tests',
+ tests_require=['pbr<1.7.0', 'mock>=1.0'],
zip_safe=False,
cmdclass={'egg_info': tagger},
)
--- /dev/null
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr old error message:
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: read /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr new error message:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: error reading /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr cancelled job:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: caught signal: interrupt
category metric task_max task_max_rate job_total
-blkio:0:0 read 0 0.00 0
-blkio:0:0 write 0 0.00 0
+blkio:0:0 read 0 0 0
+blkio:0:0 write 0 0 0
cpu cpus 8 - -
cpu sys 1.92 0.04 1.92
cpu user 3.83 0.09 3.83
cpu user+sys 5.75 0.13 5.75
-fuseops read 0 0.00 0
-fuseops write 0 0.00 0
-keepcache hit 0 0.00 0
-keepcache miss 0 0.00 0
-keepcalls get 0 0.00 0
-keepcalls put 0 0.00 0
+fuseops read 0 0 0
+fuseops write 0 0 0
+keepcache hit 0 0 0
+keepcache miss 0 0 0
+keepcalls get 0 0 0
+keepcalls put 0 0 0
mem cache 1678139392 - -
mem pgmajfault 0 - 0
mem rss 349814784 - -
net:eth0 rx 1754364530 41658344.87 1754364530
net:eth0 tx 38837956 920817.97 38837956
net:eth0 tx+rx 1793202486 42579162.83 1793202486
-net:keep0 rx 0 0.00 0
-net:keep0 tx 0 0.00 0
-net:keep0 tx+rx 0 0.00 0
+net:keep0 rx 0 0 0
+net:keep0 tx 0 0 0
+net:keep0 tx+rx 0 0 0
time elapsed 80 - 80
+# Number of tasks: 1
# Max CPU time spent by a single task: 5.75s
# Max CPU usage in a single interval: 13.00%
# Overall CPU usage: 7.19%
# Max memory used by a single task: 0.35GB
# Max network traffic in a single task: 1.79GB
# Max network speed in a single interval: 42.58MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
category metric task_max task_max_rate job_total
cpu cpus 8 - -
-cpu sys 0.00 - 0.00
-cpu user 0.00 - 0.00
-cpu user+sys 0.00 - 0.00
+cpu sys 0 - 0.00
+cpu user 0 - 0.00
+cpu user+sys 0 - 0.00
mem cache 12288 - -
mem pgmajfault 0 - 0
mem rss 856064 - -
net:eth0 tx 90 - 90
net:eth0 tx+rx 180 - 180
time elapsed 2 - 4
-# Max CPU time spent by a single task: 0.00s
+# Number of tasks: 2
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
category metric task_max task_max_rate job_total
cpu cpus 8 - -
-cpu sys 0.00 - 0.00
-cpu user 0.00 - 0.00
-cpu user+sys 0.00 - 0.00
+cpu sys 0 - 0.00
+cpu user 0 - 0.00
+cpu user+sys 0 - 0.00
mem cache 8192 - -
mem pgmajfault 0 - 0
mem rss 450560 - -
net:eth0 tx 90 - 90
net:eth0 tx+rx 180 - 180
time elapsed 2 - 3
-# Max CPU time spent by a single task: 0.00s
+# Number of tasks: 2
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+import arvados
+import collections
import crunchstat_summary.command
-import crunchstat_summary.summarizer
import difflib
import glob
+import gzip
+import mock
import os
import unittest
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
-class ExampleLogsTestCase(unittest.TestCase):
+
+class ReportDiff(unittest.TestCase):
+ def diff_known_report(self, logfile, cmd):
+ expectfile = logfile+'.report'
+ expect = open(expectfile).readlines()
+ self.diff_report(cmd, expect, expectfile=expectfile)
+
+ def diff_report(self, cmd, expect, expectfile=None):
+ got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
+ self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+ expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class SummarizeFile(ReportDiff):
def test_example_files(self):
- dirname = os.path.dirname(os.path.abspath(__file__))
- for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
- logfile = os.path.join(dirname, fnm)
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--log-file', logfile])
- summarizer = crunchstat_summary.summarizer.Summarizer(args)
- summarizer.run()
- got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
- expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
- self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
- expect, got, fromfile=expectfile, tofile="(generated)")))
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(logfile, cmd)
+
+
+class HTMLFromFile(ReportDiff):
+ def test_example_files(self):
+ # Note we don't test the output content at all yet; we're
+ # mainly just verifying the --format=html option isn't ignored
+ # and the HTML code path doesn't crash.
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--format=html', '--log-file', logfile])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+
+
+class SummarizeEdgeCases(unittest.TestCase):
+ def test_error_messages(self):
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ s = crunchstat_summary.summarizer.Summarizer(logfile)
+ s.run()
+
+
+class SummarizeJob(ReportDiff):
+ fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
+ fake_log_id = 'fake-log-collection-id'
+ fake_job = {
+ 'uuid': fake_job_uuid,
+ 'log': fake_log_id,
+ }
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_job_report(self, mock_api, mock_cr):
+ mock_api().jobs().get().execute.return_value = self.fake_job
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.return_value = gzip.open(self.logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_job_uuid])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(self.logfile, cmd)
+ mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+ mock_cr.assert_called_with(self.fake_log_id)
+ mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+ fake_instance = {
+ 'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+ 'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+ 'components': collections.OrderedDict([
+ ['foo', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000000',
+ 'log': 'fake-log-pdh-0',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }],
+ ['bar', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000001',
+ 'log': 'fake-log-pdh-1',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }],
+ ['no-job-assigned', {}],
+ ['unfinished-job', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+ },
+ }],
+ ['baz', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000002',
+ 'log': 'fake-log-pdh-2',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }]]),
+ }
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_pipeline(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().pipeline_instances().get().execute. \
+ return_value = self.fake_instance
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--pipeline-instance', self.fake_instance['uuid']])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+
+ job_report = [
+ line for line in open(logfile+'.report').readlines()
+ if not line.startswith('#!! ')]
+ expect = (
+ ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+ job_report + ['\n'] +
+ ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+ job_report + ['\n'] +
+ ['### Summary for unfinished-job (zzzzz-8i9sb-xxxxxxxxxxxxxxx)\n',
+ '(no report generated)\n',
+ '\n'] +
+ ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+ job_report)
+ self.diff_report(cmd, expect)
+ mock_cr.assert_has_calls(
+ [
+ mock.call('fake-log-pdh-0'),
+ mock.call('fake-log-pdh-1'),
+ mock.call('fake-log-pdh-2'),
+ ], any_order=True)
+ mock_cr().open.assert_called_with('fake-logfile.txt')