end
expose_action :combine_selected_files_into_collection do
- link_uuids, coll_ids = params["selection"].partition do |sel_s|
- ArvadosBase::resource_class_for_uuid(sel_s) == Link
- end
-
- unless link_uuids.empty?
- Link.select([:head_uuid]).where(uuid: link_uuids).each do |link|
- if ArvadosBase::resource_class_for_uuid(link.head_uuid) == Collection
- coll_ids << link.head_uuid
- end
- end
- end
-
- uuids = []
- pdhs = []
- source_paths = Hash.new { |hash, key| hash[key] = [] }
- coll_ids.each do |coll_id|
- if m = CollectionsHelper.match(coll_id)
- key = m[1] + m[2]
- pdhs << key
- source_paths[key] << m[4]
- elsif m = CollectionsHelper.match_uuid_with_optional_filepath(coll_id)
- key = m[1]
- uuids << key
- source_paths[key] << m[4]
- end
- end
-
- unless pdhs.empty?
- Collection.where(portable_data_hash: pdhs.uniq).
- select([:uuid, :portable_data_hash]).each do |coll|
- unless source_paths[coll.portable_data_hash].empty?
- uuids << coll.uuid
- source_paths[coll.uuid] = source_paths.delete(coll.portable_data_hash)
- end
- end
- end
+ uuids, source_paths = selected_collection_files params
new_coll = Arv::Collection.new
Collection.where(uuid: uuids.uniq).
end
end
+ # helper method to get the names of collection files selected
+ helper_method :selected_collection_files
+ def selected_collection_files params
+ link_uuids, coll_ids = params["selection"].partition do |sel_s|
+ ArvadosBase::resource_class_for_uuid(sel_s) == Link
+ end
+
+ unless link_uuids.empty?
+ Link.select([:head_uuid]).where(uuid: link_uuids).each do |link|
+ if ArvadosBase::resource_class_for_uuid(link.head_uuid) == Collection
+ coll_ids << link.head_uuid
+ end
+ end
+ end
+
+ uuids = []
+ pdhs = []
+ source_paths = Hash.new { |hash, key| hash[key] = [] }
+ coll_ids.each do |coll_id|
+ if m = CollectionsHelper.match(coll_id)
+ key = m[1] + m[2]
+ pdhs << key
+ source_paths[key] << m[4]
+ elsif m = CollectionsHelper.match_uuid_with_optional_filepath(coll_id)
+ key = m[1]
+ uuids << key
+ source_paths[key] << m[4]
+ end
+ end
+
+ unless pdhs.empty?
+ Collection.where(portable_data_hash: pdhs.uniq).
+ select([:uuid, :portable_data_hash]).each do |coll|
+ unless source_paths[coll.portable_data_hash].empty?
+ uuids << coll.uuid
+ source_paths[coll.uuid] = source_paths.delete(coll.portable_data_hash)
+ end
+ end
+ end
+
+ [uuids, source_paths]
+ end
+
def wiselinks_layout
'body'
end
require "arvados/keep"
+require "arvados/collection"
require "uri"
class CollectionsController < ApplicationController
sharing_popup
end
+ def remove_selected_files
+ uuids, source_paths = selected_collection_files params
+
+ arv_coll = Arv::Collection.new(@object.manifest_text)
+ source_paths[uuids[0]].each do |p|
+ arv_coll.rm "."+p
+ end
+
+ if @object.update_attributes manifest_text: arv_coll.manifest_text
+ show
+ else
+ self.render_error status: 422
+ end
+ end
+
+ def update
+ updated_attr = params[:collection].each.select {|a| a[0].andand.start_with? 'rename-file-path:'}
+
+ if updated_attr.size > 0
+ # Is it file rename?
+ file_path = updated_attr[0][0].split('rename-file-path:')[-1]
+
+ new_file_path = updated_attr[0][1]
+ if new_file_path.start_with?('./')
+ # looks good
+ elsif new_file_path.start_with?('/')
+ new_file_path = '.' + new_file_path
+ else
+ new_file_path = './' + new_file_path
+ end
+
+ arv_coll = Arv::Collection.new(@object.manifest_text)
+
+ if arv_coll.exist?(new_file_path)
+ @errors = 'Duplicate file path. Please use a different name.'
+ self.render_error status: 422
+ else
+ arv_coll.rename "./"+file_path, new_file_path
+
+ if @object.update_attributes manifest_text: arv_coll.manifest_text
+ show
+ else
+ self.render_error status: 422
+ end
+ end
+ else
+ # Not a file rename; use default
+ super
+ end
+ end
+
protected
def find_usable_token(token_list)
"data-placement" => "bottom",
"data-type" => input_type,
"data-title" => "Edit #{attr.to_s.gsub '_', ' '}",
- "data-name" => attr,
+ "data-name" => htmloptions['selection_name'] || attr,
"data-object-uuid" => object.uuid,
"data-toggle" => "manual",
- "data-value" => attrvalue,
+ "data-value" => htmloptions['data-value'] || attrvalue,
"id" => span_id,
:class => "editable #{is_textile?( object, attr ) ? 'editable-textile' : ''}"
}.merge(htmloptions).merge(ajax_options)
end
def editable_attributes
- %w(name description manifest_text)
+ %w(name description manifest_text filename)
end
def provenance
'data-selection-action' => 'combine-collections',
'data-toggle' => 'dropdown'
%></li>
+ <% if object.editable? %>
+ <li><%= link_to "Remove selected files", '#',
+ method: :post,
+ 'data-href' => url_for(controller: 'collections', action: :remove_selected_files),
+ 'data-selection-param-name' => 'selection[]',
+ 'data-selection-action' => 'remove-selected-files',
+ 'data-toggle' => 'dropdown'
+ %></li>
+ <% end %>
</ul>
</div>
<div class="btn-group btn-group-sm">
} %>
<span> </span>
<% end %>
+
+ <% if object.editable? %>
+ <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate', title: "Remove #{file_path}") do %>
+ <i class="fa fa-fw fa-trash-o"></i>
+ <% end %>
+ <% end %>
<% if CollectionsHelper::is_image(filename) %>
- <i class="fa fa-fw fa-bar-chart-o"></i> <%= filename %></div>
+ <i class="fa fa-fw fa-bar-chart-o"></i>
+ <% if object.editable? %>
+ <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <% else %>
+ <%= filename %>
+ <% end %>
+ </div>
<div class="collection_files_inline">
<%= link_to(image_tag("#{url_for object}/#{file_path}"),
link_params.merge(disposition: 'inline'),
</div>
</div>
<% else %>
- <i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %></div>
+ <% if object.editable? %>
+ <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <% else %>
+ <i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %>
+ <% end %>
+ </div>
</div>
<% end %>
</li>
post 'share', :on => :member
post 'unshare', :on => :member
get 'choose', on: :collection
+ post 'remove_selected_files', on: :member
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
collection = api_fixture('collections')['foo_file']
get :show, {id: collection['uuid']}, session_for(:active)
assert_includes @response.body, collection['name']
- assert_match /href="#{collection['uuid']}\/foo" ><\/i> foo</, @response.body
+ assert_match /not authorized to manage collection sharing links/, @response.body
end
test "No Upload tab on non-writable collection" do
assert_equal "https://download.example/c=#{id.sub '+', '-'}/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
end
end
+
+ test "remove selected files from collection" do
+ use_token :active
+
+ # create a new collection to test; using existing collections will cause other tests to fail,
+ # and resetting fixtures after each test makes it take almost 4 times to run this test file.
+ manifest_text = ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n./dir1 d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n"
+
+ collection = Collection.create(manifest_text: manifest_text)
+ assert_includes(collection['manifest_text'], "0:0:file1")
+
+ # now remove all files named 'file1' from the collection
+ post :remove_selected_files, {
+ id: collection['uuid'],
+ selection: ["#{collection['uuid']}/file1",
+ "#{collection['uuid']}/dir1/file1"],
+ format: :json
+ }, session_for(:active)
+ assert_response :success
+
+ # verify no 'file1' in the updated collection
+ collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
+ assert_not_includes(collection['manifest_text'], "0:0:file1")
+ assert_includes(collection['manifest_text'], "0:0:file2") # but other files still exist
+ end
+
+ test "remove all files from a subdir of a collection" do
+ use_token :active
+
+ # create a new collection to test
+ manifest_text = ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n./dir1 d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n"
+
+ collection = Collection.create(manifest_text: manifest_text)
+ assert_includes(collection['manifest_text'], "0:0:file1")
+
+ # now remove all files from "dir1" subdir of the collection
+ post :remove_selected_files, {
+ id: collection['uuid'],
+ selection: ["#{collection['uuid']}/dir1/file1",
+ "#{collection['uuid']}/dir1/file2"],
+ format: :json
+ }, session_for(:active)
+ assert_response :success
+
+ # verify that "./dir1" no longer exists in this collection's manifest text
+ collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
+ assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1 0:0:file2\n$/, collection['manifest_text']
+ assert_not_includes(collection['manifest_text'], 'dir1')
+ end
+
+ test "rename file in a collection" do
+ use_token :active
+
+ # create a new collection to test
+ manifest_text = ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n./dir1 d41d8cd98f00b204e9800998ecf8427e+0 0:0:dir1file1 0:0:dir1file2\n"
+
+ collection = Collection.create(manifest_text: manifest_text)
+ assert_includes(collection['manifest_text'], "0:0:file1")
+
+ # rename 'file1' as 'file1renamed' and verify
+ post :update, {
+ id: collection['uuid'],
+ collection: {
+ 'rename-file-path:file1' => 'file1renamed'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response :success
+
+ collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
+ assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1renamed 0:0:file2\n.\/dir1 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file1 0:0:dir1file2\n$/, collection['manifest_text']
+
+ # now rename 'file2' such that it is moved into 'dir1'
+ @test_counter = 0
+ post :update, {
+ id: collection['uuid'],
+ collection: {
+ 'rename-file-path:file2' => 'dir1/file2'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response :success
+
+ collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
+ assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1renamed\n.\/dir1 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file1 0:0:dir1file2 0:0:file2\n$/, collection['manifest_text']
+
+ # now rename 'dir1/dir1file1' such that it is moved into a new subdir
+ @test_counter = 0
+ post :update, {
+ id: collection['uuid'],
+ collection: {
+ 'rename-file-path:dir1/dir1file1' => 'dir2/dir3/dir1file1moved'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response :success
+
+ collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
+ assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1renamed\n.\/dir1 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file2 0:0:file2\n.\/dir2\/dir3 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file1moved\n$/, collection['manifest_text']
+ end
+
+ test "renaming file with a duplicate name in same stream not allowed" do
+ use_token :active
+
+ # rename 'file2' as 'file1' and expect error
+ post :update, {
+ id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+ collection: {
+ 'rename-file-path:file2' => 'file1'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response 422
+ assert_includes json_response['errors'], 'Duplicate file path'
+ end
+
+ test "renaming file with a duplicate name as another stream not allowed" do
+ use_token :active
+
+ # rename 'file1' as 'dir1/file1' and expect error
+ post :update, {
+ id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+ collection: {
+ 'rename-file-path:file1' => 'dir1/file1'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response 422
+ assert_includes json_response['errors'], 'Duplicate file path'
+ end
end
# Make sure we're not still on the old collection page.
refute_match(%r{/collections/#{col['uuid']}}, page.current_url)
end
+
+ test "remove a file from collection using checkbox and dropdown option" do
+ visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
+ assert(page.has_text?('file1'), 'file not found - file1')
+
+ # remove first file
+ input_files = page.all('input[type=checkbox]')
+ input_files[0].click
+
+ click_button 'Selection...'
+ within('.selection-action-container') do
+ click_link 'Remove selected files'
+ end
+
+ assert(page.has_no_text?('file1'), 'file found - file')
+ assert(page.has_text?('file2'), 'file not found - file2')
+ end
+
+ test "remove a file in collection using trash icon" do
+ need_selenium 'to confirm remove'
+
+ visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
+ assert(page.has_text?('file1'), 'file not found - file1')
+
+ first('.fa-trash-o').click
+ page.driver.browser.switch_to.alert.accept
+
+ assert(page.has_no_text?('file1'), 'file found - file')
+ assert(page.has_text?('file2'), 'file not found - file2')
+ end
+
+ test "rename a file in collection" do
+ visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
+
+ within('.collection_files') do
+ first('.fa-pencil').click
+ find('.editable-input input').set('file1renamed')
+ find('.editable-submit').click
+ end
+
+ assert(page.has_text?('file1renamed'), 'file not found - file1renamed')
+ end
+
+ test "remove/rename file options not presented if user cannot update a collection" do
+ # visit a publicly accessible collection as 'spectator'
+ visit page_with_token('spectator', '/collections/zzzzz-4zz18-uukreo9rbgwsujr')
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li', text: 'Create new collection with selected files'
+ assert_no_selector 'li', text: 'Remove selected files'
+ end
+
+ within('.collection_files') do
+ assert(page.has_text?('GNU_General_Public_License'), 'file not found - GNU_General_Public_License')
+ assert_nil first('.fa-pencil')
+ assert_nil first('.fa-trash-o')
+ end
+ end
end
debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.1|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.2|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|mistune|0.7.3|2|python|all
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|typing|3.5.3.0|2|python|all
centos7|daemon|2.1.1|2|python|all
centos7|pbr|0.11.1|2|python|all
centos7|pyparsing|2.1.10|2|python|all
-centos7|sparqlwrapper|1.8.0|2|python|all
-centos7|html5lib|0.9999999|2|python|all
centos7|keepalive|0.5|2|python|all
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
# Install RVM
RUN apt-get update && \
- DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates && \
+ DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates g++ && \
gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
- install/install-postgresql.html.textile.liquid
- install/install-sso.html.textile.liquid
- install/install-api-server.html.textile.liquid
+ - install/install-ws.html.textile.liquid
- install/install-arv-git-httpd.html.textile.liquid
- install/install-workbench-app.html.textile.liquid
- install/install-shell-server.html.textile.liquid
|cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
|command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
|output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount. Also, see "Pre-populate output using Mount points":#pre-populate-output for details regarding optional output pre-population using mount points.|Required.|
+|output_name|string|Desired name for the output collection. If null, a name will be assigned automatically.||
+|output_ttl|integer|Desired lifetime for the output collection, in seconds. If zero, the output collection will not be deleted automatically.||
|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
|expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
|use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
{% include 'notebox_end' %}
+h3. CrunchRunCommand: Using host networking for containers
+
+Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups. This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net". If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster. Be aware this reduces container isolation, which may be a security risk.
+
+<notextile>
+<pre><code class="userinput">Client:
+ APIHost: <b>zzzzz.arvadosapi.com</b>
+ AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
+CrunchRunCommand:
+- <b>crunch-run</b>
+- <b>"-container-enable-networking=always"</b>
+- <b>"-container-network-mode=host"</b>
+</code></pre>
+</notextile>
+
h3. MinRetryPeriod: Rate-limit repeated attempts to start containers
If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
</code></pre>
</notextile>
-h2(#set_up). Set up Web servers
+h2(#set_up). Set up Nginx and Passenger
-For best performance, we recommend you use Nginx as your Web server front-end, with a Passenger backend for the main API server and a Puma backend for API server Websockets. To do that:
+The Nginx server will serve API requests using Passenger. It will also be used to proxy SSL requests to other services which are covered later in this guide.
-<notextile>
-<ol>
-<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
-
-<li><p>Install runit to supervise the Puma daemon. {% include 'install_runit' %}<notextile></p></li>
-
-<li><p>Install the script below as the run script for the Puma service, modifying it as directed by the comments.</p>
-
-<pre><code>#!/bin/bash
-
-set -e
-exec 2>&1
-
-# Uncomment the line below if you're using RVM.
-#source /etc/profile.d/rvm.sh
-
-envdir="`pwd`/env"
-mkdir -p "$envdir"
-echo ws-only > "$envdir/ARVADOS_WEBSOCKETS"
-
-cd /var/www/arvados-api/current
-echo "Starting puma in `pwd`"
-
-# Change arguments below to match your deployment, "webserver-user" and
-# "webserver-group" should be changed to the user and group of the web server
-# process. This is typically "www-data:www-data" on Debian systems by default,
-# other systems may use different defaults such the name of the web server
-# software (for example, "nginx:nginx").
-exec chpst -m 1073741824 -u webserver-user:webserver-group -e "$envdir" \
- bundle exec puma -t 0:512 -e production -b tcp://127.0.0.1:8100
-</code></pre>
-</li>
+First, "Install Nginx and Phusion Passenger":https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html.
-<li><p>Edit the http section of your Nginx configuration to run the Passenger server, and act as a front-end for both it and Puma. You might add a block like the following, adding SSL and logging parameters to taste:</p>
+Edit the http section of your Nginx configuration to run the Passenger server, and serve SSL requests. Add a block like the following, adding SSL and logging parameters to taste:
+<notextile>
<pre><code>server {
listen 127.0.0.1:8000;
server_name localhost-api;
server 127.0.0.1:8000 fail_timeout=10s;
}
-upstream websockets {
- # The address below must match the one specified in puma's -b option.
- server 127.0.0.1:8100 fail_timeout=10s;
-}
-
proxy_http_version 1.1;
# When Keep clients request a list of Keep services from the API server, the
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
-
-server {
- listen <span class="userinput">[your public IP address]</span>:443 ssl;
- server_name ws.<span class="userinput">uuid_prefix.your.domain</span>;
-
- ssl on;
- ssl_certificate <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
- ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
-
- index index.html index.htm index.php;
-
- location / {
- proxy_pass http://websockets;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
-
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- }
-}
</code></pre>
-</li>
+</notextile>
-<li><p>Restart Nginx:</p>
+Restart Nginx to apply the new configuration.
+<notextile>
<pre><code>~$ <span class="userinput">sudo nginx -s reload</span>
</code></pre>
-
-</li>
-
-</ol>
</notextile>
h2. Prepare the API server deployment
{% include 'notebox_begin' %}
You can safely ignore the following messages if they appear while this command runs:
-<pre>Don't run Bundler as root. Bundler can ask for sudo if it is needed, and installing your bundle as root will
-break this application for all non-root users on this machine.</pre>
-<pre>fatal: Not a git repository (or any of the parent directories): .git</pre>
+
+<notextile><pre>Don't run Bundler as root. Bundler can ask for sudo if it is needed, and installing your bundle as root will
+break this application for all non-root users on this machine.</pre></notextile>
+
+<notextile><pre>fatal: Not a git repository (or any of the parent directories): .git</pre></notextile>
{% include 'notebox_end' %}
title: Install the websocket server
...
-{% include 'notebox_begin_warning' %}
-
-This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites.
-
-{% include 'notebox_end' %}
-
-The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information.
+The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/ws for additional information.
By convention, we use the following hostname for the websocket service.
}
</pre></notextile>
-If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict.
+{% include 'notebox_begin' %}
+If you are upgrading a cluster where Nginx is configured to proxy @ws@ requests to puma, change the @server_name@ value in the old configuration block so it doesn't conflict. When the new configuration is working, delete the old Nginx configuration sections (i.e., the "upstream websockets" block, and the "server" block that references @http://websockets@), and disable/remove the runit or systemd files for the puma server.
+{% include 'notebox_end' %}
h3. Update API server configuration
[ -d $CGROUP ] || mkdir $CGROUP
if mountpoint -q $CGROUP ; then
- break
+ true
else
mount -n -t tmpfs -o uid=0,gid=0,mode=0755 cgroup $CGROUP
fi
# Systemd and OpenRC (and possibly others) both create such a
# cgroup. To avoid the aforementioned bug, we symlink "foo" to
# "name=foo". This shouldn't have any adverse effect.
- echo $SUBSYS | grep -q ^name= && {
- NAME=$(echo $SUBSYS | sed s/^name=//)
- ln -s $SUBSYS $CGROUP/$NAME
- }
+ #echo $SUBSYS | grep -q ^name= && {
+ # NAME=$(echo $SUBSYS | sed s/^name=//)
+ # ln -s $SUBSYS $CGROUP/$NAME
+ #}
# Likewise, on at least one system, it has been reported that
# systemd would mount the CPU and CPU accounting controllers
read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
-if ! docker daemon --storage-driver=overlay $DOCKER_DAEMON_ARGS ; then
- docker daemon $DOCKER_DAEMON_ARGS
-fi
+exec docker daemon --storage-driver=vfs $DOCKER_DAEMON_ARGS
set -e
+function start_docker {
+ /root/dnd.sh &
+ for i in $(seq 1 10) ; do
+ if docker version >/dev/null 2>/dev/null ; then
+ return
+ fi
+ sleep 1
+ done
+ false
+}
+
+function kill_docker {
+ if test -f /var/run/docker.pid ; then
+ kill $(cat /var/run/docker.pid)
+ fi
+ for i in $(seq 1 10) ; do
+ if ! docker version >/dev/null 2>/dev/null ; then
+ return
+ fi
+ sleep 1
+ done
+ false
+}
+
function cleanup {
- kill $(cat /var/run/docker.pid)
- sleep 1
+ kill_docker
rm -rf /var/lib/docker/*
+ rm -rf /root/.cache/arvados/docker/*
}
trap cleanup EXIT
-/root/dnd.sh &
-sleep 2
+start_docker
image_tar_keepref=$1
image_id=$2
docker images -a
-kill $(cat /var/run/docker.pid)
-sleep 1
+kill_docker
cd /root/pkgs
-dpkg -i libltdl7_2.4.2-1.11+b1_amd64.deb docker-engine_1.13.1-0~debian-jessie_amd64.deb
+dpkg -i libltdl7_2.4.2-1.11+b1_amd64.deb docker-engine_1.13.1-0~debian-jessie_amd64.deb
-/root/dnd.sh &
-sleep 2
+start_docker
docker images -a
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
- s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
+ s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
s.add_runtime_dependency 'trollop', '~> 2.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
s.add_runtime_dependency 'oj', '~> 2.0', '>= 2.0.3'
import hashlib
import copy
import json
+import re
from functools import partial
import pkg_resources # part of setuptools
def add_arv_hints():
cache = {}
+ cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+ cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
cache["http://arvados.org/cwl"] = res.read()
res.close()
def check_exists(self, url):
try:
+ if url.startswith("http://arvados.org/cwl"):
+ return True
if url.startswith("keep:"):
return self.fsaccess.exists(url)
if url.startswith("arvwf:"):
from functools import partial
import logging
import json
-import re
import subprocess
from StringIO import StringIO
logger = logging.getLogger('arvados.cwl-runner')
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
-
def trim_listing(obj):
"""Remove 'listing' field from Directory objects that are keep references.
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170213175853',
- 'schema-salad==2.2.20170208112505',
+ 'cwltool==1.0.20170224141733',
+ 'schema-salad==2.2.20170222151604',
+ 'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170112173420',
'setuptools'
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
- 'owner_uuid': None,
+ 'replication_desired': None,
'name': 'submit_tool.cwl dependencies',
}), ensure_unique_name=True),
- mock.call().execute(),
+ mock.call().execute(num_retries=4),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'owner_uuid': None,
+ 'replication_desired': None,
'name': 'submit_wf.cwl input',
}), ensure_unique_name=True),
- mock.call().execute()])
+ mock.call().execute(num_retries=4)])
arvdock.assert_has_calls([
mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
- 'owner_uuid': None,
+ 'replication_desired': None,
'name': 'submit_tool.cwl dependencies',
}), ensure_unique_name=True),
- mock.call().execute(),
+ mock.call().execute(num_retries=4),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'owner_uuid': None,
+ 'replication_desired': None,
'name': 'submit_wf.cwl input',
}), ensure_unique_name=True),
- mock.call().execute()])
+ mock.call().execute(num_retries=4)])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
"git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
+ "log"
"math/rand"
"net"
"net/http"
+ "os"
+ "regexp"
"strings"
"time"
)
// log.Printf to DebugPrintf.
var DebugPrintf = func(string, ...interface{}) {}
+func init() {
+ var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+ if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+ DebugPrintf = log.Printf
+ }
+}
+
type keepService struct {
Uuid string `json:"uuid"`
Hostname string `json:"service_host"`
import config
import errors
import util
+import cache
_logger = logging.getLogger('arvados.api')
try:
util.mkdir_dash_p(path)
except OSError:
- path = None
- return path
+ return None
+ return cache.SafeHTTPCache(path, max_age=60*60*24*2)
def api(version=None, cache=True, host=None, token=None, insecure=False, **kwargs):
"""Return an apiclient Resources object for an Arvados instance.
--- /dev/null
+import errno
+import md5
+import os
+import tempfile
+import time
+
+class SafeHTTPCache(object):
+ """Thread-safe replacement for httplib2.FileCache"""
+
+ def __init__(self, path=None, max_age=None):
+ self._dir = path
+ if max_age is not None:
+ try:
+ self._clean(threshold=time.time() - max_age)
+ except:
+ pass
+
+ def _clean(self, threshold=0):
+ for ent in os.listdir(self._dir):
+ fnm = os.path.join(self._dir, ent)
+ if os.path.isdir(fnm) or not fnm.endswith('.tmp'):
+ continue
+ stat = os.lstat(fnm)
+ if stat.st_mtime < threshold:
+ try:
+ os.unlink(fnm)
+ except OSError as err:
+ if err.errno != errno.ENOENT:
+ raise
+
+ def __str__(self):
+ return self._dir
+
+ def _filename(self, url):
+ return os.path.join(self._dir, md5.new(url).hexdigest()+'.tmp')
+
+ def get(self, url):
+ filename = self._filename(url)
+ try:
+ with open(filename, 'rb') as f:
+ return f.read()
+ except IOError, OSError:
+ return None
+
+ def set(self, url, content):
+ try:
+ fd, tempname = tempfile.mkstemp(dir=self._dir)
+ except:
+ return None
+ try:
+ try:
+ f = os.fdopen(fd, 'w')
+ except:
+ os.close(fd)
+ raise
+ try:
+ f.write(content)
+ finally:
+ f.close()
+ os.rename(tempname, self._filename(url))
+ tempname = None
+ finally:
+ if tempname:
+ os.unlink(tempname)
+
+ def delete(self, url):
+ try:
+ os.unlink(self._filename(url))
+ except OSError as err:
+ if err.errno != errno.ENOENT:
+ raise
'--print-unmigrated', action='store_true',
default=False, help="Print list of images needing migration.")
+ migrate19_parser.add_argument('--tempdir', help="Set temporary directory")
+
migrate19_parser.add_argument('infile', nargs='?', type=argparse.FileType('r'),
default=None, help="List of images to be migrated")
args = migrate19_parser.parse_args(arguments)
+ if args.tempdir:
+ tempfile.tempdir = args.tempdir
+
only_migrate = None
if args.infile:
only_migrate = set()
items = arvados.util.list_all(api_client.collections().list,
filters=[["uuid", "in", [img["collection"] for img in old_images]]],
- select=["uuid", "portable_data_hash"])
- uuid_to_pdh = {i["uuid"]: i["portable_data_hash"] for i in items}
+ select=["uuid", "portable_data_hash", "manifest_text", "owner_uuid"])
+ uuid_to_collection = {i["uuid"]: i for i in items}
need_migrate = {}
+ biggest = 0
for img in old_images:
- pdh = uuid_to_pdh[img["collection"]]
+ i = uuid_to_collection[img["collection"]]
+ pdh = i["portable_data_hash"]
if pdh not in already_migrated and (only_migrate is None or pdh in only_migrate):
need_migrate[pdh] = img
+ with CollectionReader(i["manifest_text"]) as c:
+ if c.values()[0].size() > biggest:
+ biggest = c.values()[0].size()
if args.print_unmigrated:
only_migrate = set()
logger.info("Already migrated %i images", len(already_migrated))
logger.info("Need to migrate %i images", len(need_migrate))
+ logger.info("Using tempdir %s", tempfile.gettempdir())
+ logger.info("Biggest image is about %i MiB, tempdir needs at least %i MiB free", biggest/(2**20), (biggest*2)/(2**20))
if args.dry_run:
return
failures = []
count = 1
for old_image in need_migrate.values():
- if uuid_to_pdh[old_image["collection"]] in already_migrated:
+ if uuid_to_collection[old_image["collection"]]["portable_data_hash"] in already_migrated:
continue
- logger.info("[%i/%i] Migrating %s:%s (%s)", count, len(need_migrate), old_image["repo"], old_image["tag"], old_image["collection"])
+ oldcol = CollectionReader(uuid_to_collection[old_image["collection"]]["manifest_text"])
+ tarfile = oldcol.keys()[0]
+
+ logger.info("[%i/%i] Migrating %s:%s (%s) (%i MiB)", count, len(need_migrate), old_image["repo"],
+ old_image["tag"], old_image["collection"], oldcol.values()[0].size()/(2**20))
count += 1
start = time.time()
- oldcol = CollectionReader(old_image["collection"])
- tarfile = oldcol.keys()[0]
-
varlibdocker = tempfile.mkdtemp()
+ dockercache = tempfile.mkdtemp()
try:
with tempfile.NamedTemporaryFile() as envfile:
envfile.write("ARVADOS_API_HOST=%s\n" % (os.environ["ARVADOS_API_HOST"]))
"--rm",
"--env-file", envfile.name,
"--volume", "%s:/var/lib/docker" % varlibdocker,
+ "--volume", "%s:/root/.cache/arvados/docker" % dockercache,
"arvados/migrate-docker19",
"/root/migrate.sh",
"%s/%s" % (old_image["collection"], tarfile),
tarfile[0:40],
old_image["repo"],
old_image["tag"],
- oldcol.api_response()["owner_uuid"]]
+ uuid_to_collection[old_image["collection"]]["owner_uuid"]]
proc = subprocess.Popen(dockercmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
failures.append(old_image["collection"])
finally:
shutil.rmtree(varlibdocker)
+ shutil.rmtree(dockercache)
logger.info("Successfully migrated %i images", len(success))
if failures:
import subprocess
import logging
import sys
+import errno
import arvados.commands._util as arv_cmd
+import arvados.collection
from arvados._version import __version__
# original parameter string).
def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
absfn = os.path.abspath(fn)
- if os.path.exists(absfn):
+ try:
st = os.stat(absfn)
- if stat.S_ISREG(st.st_mode):
- sp = os.path.split(absfn)
- (pdh, branch) = is_in_collection(sp[0], sp[1])
- if pdh:
+ sp = os.path.split(absfn)
+ (pdh, branch) = is_in_collection(sp[0], sp[1])
+ if pdh:
+ if stat.S_ISREG(st.st_mode):
return ArvFile(prefix, fnPattern % (pdh, branch))
- else:
- # trim leading '/' for path prefix test later
- return UploadFile(prefix, absfn[1:])
- if stat.S_ISDIR(st.st_mode):
- sp = os.path.split(absfn)
- (pdh, branch) = is_in_collection(sp[0], sp[1])
- if pdh:
+ elif stat.S_ISDIR(st.st_mode):
return ArvFile(prefix, dirPattern % (pdh, branch))
+ else:
+ raise Exception("%s is not a regular file or directory" % absfn)
+ else:
+ # trim leading '/' for path prefix test later
+ return UploadFile(prefix, absfn[1:])
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ pass
+ else:
+ raise
return prefix+fn
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
+def write_file(collection, pathprefix, fn):
+ with open(os.path.join(pathprefix, fn)) as src:
+ dst = collection.open(fn, "w")
+ r = src.read(1024*128)
+ while r:
+ dst.write(r)
+ r = src.read(1024*128)
+ dst.close(flush=False)
+
+def uploadfiles(files, api, dry_run=False, num_retries=0,
+ project=None,
+ fnPattern="$(file %s/%s)",
+ name=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file paths no longer have a common parent.
for c in files:
c.fn = c.fn[len(pathstep):]
- orgdir = os.getcwd()
- os.chdir(pathprefix)
-
logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
if dry_run:
pdh = "$(input)"
else:
files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=num_retries)
- stream = None
+ collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
+ prev = ""
for f in files:
- sp = os.path.split(f.fn)
- if sp[0] != stream:
- stream = sp[0]
- collection.start_new_stream(stream)
- collection.write_file(f.fn, sp[1])
+ localpath = os.path.join(pathprefix, f.fn)
+ if prev and localpath.startswith(prev+"/"):
+ # If this path is inside an already uploaded subdirectory,
+ # don't redundantly re-upload it.
+ # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
+ # skip it because it starts with "/tmp/foo/"
+ continue
+ prev = localpath
+ if os.path.isfile(localpath):
+ write_file(collection, pathprefix, f.fn)
+ elif os.path.isdir(localpath):
+ for root, dirs, iterfiles in os.walk(localpath):
+ root = root[len(pathprefix):]
+ for src in iterfiles:
+ write_file(collection, pathprefix, os.path.join(root, src))
filters=[["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
if project:
filters.append(["owner_uuid", "=", project])
- exists = api.collections().list(filters=filters).execute(num_retries=num_retries)
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
if exists["items"]:
item = exists["items"][0]
- logger.info("Using collection %s", item["uuid"])
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])
else:
- body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
- if name is not None:
- body["name"] = name
- item = api.collections().create(body=body, ensure_unique_name=True).execute()
- logger.info("Uploaded to %s", item["uuid"])
-
- pdh = item["portable_data_hash"]
+ collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
for c in files:
c.keepref = "%s/%s" % (pdh, c.fn)
c.fn = fnPattern % (pdh, c.fn)
- os.chdir(orgdir)
-
def main(arguments=None):
args = arvrun_parser.parse_args(arguments)
'pycurl >=7.19.5.1, <7.21.5',
'python-gflags<3.0',
'setuptools',
- 'ws4py',
+ 'ws4py<0.4',
'ruamel.yaml==0.13.7'
],
test_suite='tests',
# This will clear cached docs that belong to other processes (like
# concurrent test suites) even if they're still running. They should
# be able to tolerate that.
- for fn in glob.glob(os.path.join(arvados.http_cache('discovery'),
- '*,arvados,v1,rest,*')):
+ for fn in glob.glob(os.path.join(
+ str(arvados.http_cache('discovery')),
+ '*,arvados,v1,rest,*')):
os.unlink(fn)
pid_file = _pidfile('api')
--- /dev/null
+from __future__ import print_function
+
+import md5
+import mock
+import os
+import random
+import shutil
+import sys
+import tempfile
+import threading
+import unittest
+
+import arvados.cache
+import arvados
+import run_test_server
+
+
+def _random(n):
+ return bytearray(random.getrandbits(8) for _ in xrange(n))
+
+
+class CacheTestThread(threading.Thread):
+ def __init__(self, dir):
+ super(CacheTestThread, self).__init__()
+ self._dir = dir
+
+ def run(self):
+ c = arvados.cache.SafeHTTPCache(self._dir)
+ url = 'http://example.com/foo'
+ self.ok = True
+ for x in range(16):
+ try:
+ data_in = _random(128)
+ data_in = md5.new(data_in).hexdigest() + "\n" + str(data_in)
+ c.set(url, data_in)
+ data_out = c.get(url)
+ digest, content = data_out.split("\n", 1)
+ if digest != md5.new(content).hexdigest():
+ self.ok = False
+ except Exception as err:
+ self.ok = False
+ print("cache failed: {}".format(err), file=sys.stderr)
+
+
+class CacheTest(unittest.TestCase):
+ def setUp(self):
+ self._dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self._dir)
+
+ def test_cache_create_error(self):
+ _, filename = tempfile.mkstemp()
+ home_was = os.environ['HOME']
+ os.environ['HOME'] = filename
+ try:
+ c = arvados.http_cache('test')
+ self.assertEqual(None, c)
+ finally:
+ os.environ['HOME'] = home_was
+ os.unlink(filename)
+
+ def test_cache_crud(self):
+ c = arvados.cache.SafeHTTPCache(self._dir, max_age=0)
+ url = 'https://example.com/foo?bar=baz'
+ data1 = _random(256)
+ data2 = _random(128)
+ self.assertEqual(None, c.get(url))
+ c.delete(url)
+ c.set(url, data1)
+ self.assertEqual(data1, c.get(url))
+ c.delete(url)
+ self.assertEqual(None, c.get(url))
+ c.set(url, data1)
+ c.set(url, data2)
+ self.assertEqual(data2, c.get(url))
+
+ def test_cache_threads(self):
+ threads = []
+ for _ in range(64):
+ t = CacheTestThread(dir=self._dir)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ self.assertTrue(t.ok)
+
+
+class CacheIntegrationTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+
+ def test_cache_used_by_default_client(self):
+ with mock.patch('arvados.cache.SafeHTTPCache.get') as getter:
+ arvados.api('v1')._rootDesc.get('foobar')
+ getter.assert_called()
s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
- s.add_dependency('json', '~> 1.7', '>= 1.7.7')
+ s.add_dependency('json', '>= 1.7.7', '<3')
s.add_runtime_dependency('jwt', '<2', '>= 0.1.5')
s.homepage =
'https://arvados.org'
end
end
end
+
+# Work around Rails3+PostgreSQL9.5 incompatibility (pg_dump used to
+# accept -i as a no-op, but now it's not accepted at all).
+module Kernel
+ alias_method :orig_backtick, :`
+ def `(*args) #`#` sorry, parsers
+ args[0].sub!(/\Apg_dump -i /, 'pg_dump ') rescue nil
+ orig_backtick(*args)
+ end
+end
attr_writer :resource_attrs
- MAX_UNIQUE_NAME_ATTEMPTS = 10
-
begin
rescue_from(Exception,
ArvadosModel::PermissionDeniedError,
def create
@object = model_class.new resource_attrs
- if @object.respond_to? :name and params[:ensure_unique_name]
- # Record the original name. See below.
- name_stem = @object.name
- retries = MAX_UNIQUE_NAME_ATTEMPTS
+ if @object.respond_to?(:name) && params[:ensure_unique_name]
+ @object.save_with_unique_name!
else
- retries = 0
- end
-
- begin
@object.save!
- rescue ActiveRecord::RecordNotUnique => rn
- raise unless retries > 0
- retries -= 1
-
- # Dig into the error to determine if it is specifically calling out a
- # (owner_uuid, name) uniqueness violation. In this specific case, and
- # the client requested a unique name with ensure_unique_name==true,
- # update the name field and try to save again. Loop as necessary to
- # discover a unique name. It is necessary to handle name choosing at
- # this level (as opposed to the client) to ensure that record creation
- # never fails due to a race condition.
- raise unless rn.original_exception.is_a? PG::UniqueViolation
-
- # Unfortunately ActiveRecord doesn't abstract out any of the
- # necessary information to figure out if this the error is actually
- # the specific case where we want to apply the ensure_unique_name
- # behavior, so the following code is specialized to Postgres.
- err = rn.original_exception
- detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
- raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
-
- @object.uuid = nil
-
- new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
- if new_name == @object.name
- # If the database is fast enough to do two attempts in the
- # same millisecond, we need to wait to ensure we try a
- # different timestamp on each attempt.
- sleep 0.002
- new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
- end
- @object.name = new_name
- retry
end
+
show
end
@objects = model_class.where('last_ping_at >= ?', db_current_time - 1.hours)
end
super
- job_uuids = @objects.map { |n| n[:job_uuid] }.compact
- assoc_jobs = readable_job_uuids(job_uuids)
- @objects.each do |node|
- node.job_readable = assoc_jobs.include?(node[:job_uuid])
+ if @select.nil? or @select.include? 'job_uuid'
+ job_uuids = @objects.map { |n| n[:job_uuid] }.compact
+ assoc_jobs = readable_job_uuids(job_uuids)
+ @objects.each do |node|
+ node.job_readable = assoc_jobs.include?(node[:job_uuid])
+ end
end
end
permission_link_classes: ['permission', 'resources'])
end
+ def save_with_unique_name!
+ uuid_was = uuid
+ name_was = name
+ max_retries = 2
+ transaction do
+ conn = ActiveRecord::Base.connection
+ conn.exec_query 'SAVEPOINT save_with_unique_name'
+ begin
+ save!
+ rescue ActiveRecord::RecordNotUnique => rn
+ raise if max_retries == 0
+ max_retries -= 1
+
+ conn.exec_query 'ROLLBACK TO SAVEPOINT save_with_unique_name'
+
+ # Dig into the error to determine if it is specifically calling out a
+ # (owner_uuid, name) uniqueness violation. In this specific case, and
+ # the client requested a unique name with ensure_unique_name==true,
+ # update the name field and try to save again. Loop as necessary to
+ # discover a unique name. It is necessary to handle name choosing at
+ # this level (as opposed to the client) to ensure that record creation
+ # never fails due to a race condition.
+ err = rn.original_exception
+ raise unless err.is_a?(PG::UniqueViolation)
+
+ # Unfortunately ActiveRecord doesn't abstract out any of the
+ # necessary information to figure out if this the error is actually
+ # the specific case where we want to apply the ensure_unique_name
+ # behavior, so the following code is specialized to Postgres.
+ detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
+ raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
+
+ new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+ if new_name == name
+ # If the database is fast enough to do two attempts in the
+ # same millisecond, we need to wait to ensure we try a
+ # different timestamp on each attempt.
+ sleep 0.002
+ new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+ end
+
+ self[:name] = new_name
+ self[:uuid] = nil if uuid_was.nil? && !uuid.nil?
+ conn.exec_query 'SAVEPOINT save_with_unique_name'
+ retry
+ ensure
+ conn.exec_query 'RELEASE SAVEPOINT save_with_unique_name'
+ end
+ end
+ end
+
def logged_attributes
attributes.except(*Rails.configuration.unlogged_attributes)
end
raise PermissionDeniedError
end
- # Verify "write" permission on old owner
- # default fail unless one of:
- # owner_uuid did not change
- # previous owner_uuid is nil
- # current user is the old owner
- # current user is this object
- # current user can_write old owner
- unless !owner_uuid_changed? or
- owner_uuid_was.nil? or
- current_user.uuid == self.owner_uuid_was or
- current_user.uuid == self.uuid or
- current_user.can? write: self.owner_uuid_was
- logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write old owner_uuid #{owner_uuid_was}"
- errors.add :owner_uuid, "cannot be changed without write permission on old owner"
- raise PermissionDeniedError
- end
-
- # Verify "write" permission on new owner
- # default fail unless one of:
- # current_user is this object
- # current user can_write new owner, or this object if owner unchanged
- if new_record? or owner_uuid_changed? or is_a?(ApiClientAuthorization)
- write_target = owner_uuid
+ if new_record? || owner_uuid_changed?
+ # Permission on owner_uuid_was is needed to move an existing
+ # object away from its previous owner (which implies permission
+ # to modify this object itself, so we don't need to check that
+ # separately). Permission on the new owner_uuid is also needed.
+ [['old', owner_uuid_was],
+ ['new', owner_uuid]
+ ].each do |which, check_uuid|
+ if check_uuid.nil?
+ # old_owner_uuid is nil? New record, no need to check.
+ elsif !current_user.can?(write: check_uuid)
+ logger.warn "User #{current_user.uuid} tried to set ownership of #{self.class.to_s} #{self.uuid} but does not have permission to write #{which} owner_uuid #{check_uuid}"
+ errors.add :owner_uuid, "cannot be set or changed without write permission on #{which} owner"
+ raise PermissionDeniedError
+ end
+ end
else
- write_target = uuid
- end
- unless current_user == self or current_user.can? write: write_target
- logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write new owner_uuid #{owner_uuid}"
- errors.add :owner_uuid, "cannot be changed without write permission on new owner"
- raise PermissionDeniedError
+ # If the object already existed and we're not changing
+ # owner_uuid, we only need write permission on the object
+ # itself.
+ if !current_user.can?(write: self.uuid)
+ logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{self.uuid} without write permission"
+ errors.add :uuid, "is not writable"
+ raise PermissionDeniedError
+ end
end
true
false
elsif portable_data_hash[0..31] != computed_pdh[0..31]
errors.add(:portable_data_hash,
- "does not match computed hash #{computed_pdh}")
+ "'#{portable_data_hash}' does not match computed hash '#{computed_pdh}'")
false
else
# Ignore the client-provided size part: always store
true
end
- # If trash_at is updated without touching delete_at, automatically
- # update delete_at to a sensible value.
def default_trash_interval
if trash_at_changed? && !delete_at_changed?
+ # If trash_at is updated without touching delete_at,
+ # automatically update delete_at to a sensible value.
if trash_at.nil?
self.delete_at = nil
else
self.delete_at = trash_at + Rails.configuration.default_trash_lifetime.seconds
end
+ elsif !trash_at || !delete_at || trash_at > delete_at
+ # Not trash, or bogus arguments? Just validate in
+ # validate_trash_and_delete_timing.
+ elsif delete_at_changed? && delete_at >= trash_at
+ # Fix delete_at if needed, so it's not earlier than the expiry
+ # time on any permission tokens that might have been given out.
+
+ # In any case there are no signatures expiring after now+TTL.
+ # Also, if the existing trash_at time has already passed, we
+ # know we haven't given out any signatures since then.
+ earliest_delete = [
+ @validation_timestamp,
+ trash_at_was,
+ ].compact.min + Rails.configuration.blob_signature_ttl.seconds
+
+ # The previous value of delete_at is also an upper bound on the
+ # longest-lived permission token. For example, if TTL=14,
+ # trash_at_was=now-7, delete_at_was=now+7, then it is safe to
+ # set trash_at=now+6, delete_at=now+8.
+ earliest_delete = [earliest_delete, delete_at_was].compact.min
+
+ # If delete_at is too soon, use the earliest possible time.
+ if delete_at < earliest_delete
+ self.delete_at = earliest_delete
+ end
end
end
def validate_trash_and_delete_timing
if trash_at.nil? != delete_at.nil?
errors.add :delete_at, "must be set if trash_at is set, and must be nil otherwise"
- end
-
- earliest_delete = ([@validation_timestamp, trash_at_was].compact.min +
- Rails.configuration.blob_signature_ttl.seconds)
- if delete_at && delete_at < earliest_delete
- errors.add :delete_at, "#{delete_at} is too soon: earliest allowed is #{earliest_delete}"
- end
-
- if delete_at && delete_at < trash_at
+ elsif delete_at && delete_at < trash_at
errors.add :delete_at, "must not be earlier than trash_at"
end
-
true
end
end
end
end
+ # Create a new container (or find an existing one) to satisfy the
+ # given container request.
+ def self.resolve(req)
+ c_attrs = {
+ command: req.command,
+ cwd: req.cwd,
+ environment: req.environment,
+ output_path: req.output_path,
+ container_image: resolve_container_image(req.container_image),
+ mounts: resolve_mounts(req.mounts),
+ runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
+ scheduling_parameters: req.scheduling_parameters,
+ }
+ act_as_system_user do
+ if req.use_existing && (reusable = find_reusable(c_attrs))
+ reusable
+ else
+ Container.create!(c_attrs)
+ end
+ end
+ end
+
+ # Return a runtime_constraints hash that complies with requested but
+ # is suitable for saving in a container record, i.e., has specific
+ # values instead of ranges.
+ #
+ # Doing this as a step separate from other resolutions, like "git
+ # revision range to commit hash", makes sense only when there is no
+ # opportunity to reuse an existing container (e.g., container reuse
+ # is not implemented yet, or we have already found that no existing
+ # containers are suitable).
+ def self.resolve_runtime_constraints(runtime_constraints)
+ rc = {}
+ defaults = {
+ 'keep_cache_ram' =>
+ Rails.configuration.container_default_keep_cache_ram,
+ }
+ defaults.merge(runtime_constraints).each do |k, v|
+ if v.is_a? Array
+ rc[k] = v[0]
+ else
+ rc[k] = v
+ end
+ end
+ rc
+ end
+
+ # Return a mounts hash suitable for a Container, i.e., with every
+ # readonly collection UUID resolved to a PDH.
+ def self.resolve_mounts(mounts)
+ c_mounts = {}
+ mounts.each do |k, mount|
+ mount = mount.dup
+ c_mounts[k] = mount
+ if mount['kind'] != 'collection'
+ next
+ end
+ if (uuid = mount.delete 'uuid')
+ c = Collection.
+ readable_by(current_user).
+ where(uuid: uuid).
+ select(:portable_data_hash).
+ first
+ if !c
+ raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
+ end
+ if mount['portable_data_hash'].nil?
+ # PDH not supplied by client
+ mount['portable_data_hash'] = c.portable_data_hash
+ elsif mount['portable_data_hash'] != c.portable_data_hash
+ # UUID and PDH supplied by client, but they don't agree
+ raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
+ end
+ end
+ end
+ return c_mounts
+ end
+
+ # Return a container_image PDH suitable for a Container.
+ def self.resolve_container_image(container_image)
+ coll = Collection.for_latest_docker_image(container_image)
+ if !coll
+ raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
+ end
+ coll.portable_data_hash
+ end
+
def self.find_reusable(attrs)
candidates = Container.
where_serialized(:command, attrs[:command]).
where('cwd = ?', attrs[:cwd]).
where_serialized(:environment, attrs[:environment]).
where('output_path = ?', attrs[:output_path]).
- where('container_image = ?', attrs[:container_image]).
- where_serialized(:mounts, attrs[:mounts]).
- where_serialized(:runtime_constraints, attrs[:runtime_constraints])
+ where('container_image = ?', resolve_container_image(attrs[:container_image])).
+ where_serialized(:mounts, resolve_mounts(attrs[:mounts])).
+ where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]))
# Check for Completed candidates whose output and log are both readable.
select_readable_pdh = Collection.
before_validation :validate_scheduling_parameters
before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
+ validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validate :validate_state_change
- validate :validate_change
+ validate :check_update_whitelist
after_save :update_priority
after_save :finalize_if_needed
before_create :set_requesting_container_uuid
t.add :output_name
t.add :output_path
t.add :output_uuid
+ t.add :output_ttl
t.add :priority
t.add :properties
t.add :requesting_container_uuid
Committed => [Final]
}
+ AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+ AttrsPermittedBeforeCommit = [:command, :container_count_max,
+ :container_image, :cwd, :environment, :filters, :mounts,
+ :output_path, :priority, :properties, :requesting_container_uuid,
+ :runtime_constraints, :state, :container_uuid, :use_existing,
+ :scheduling_parameters, :output_name, :output_ttl]
+
def state_transitions
State_transitions
end
['output', 'log'].each do |out_type|
pdh = c.send(out_type)
next if pdh.nil?
- if self.output_name and out_type == 'output'
- coll_name = self.output_name
- else
- coll_name = "Container #{out_type} for request #{uuid}"
+ coll_name = "Container #{out_type} for request #{uuid}"
+ trash_at = nil
+ if out_type == 'output'
+ if self.output_name
+ coll_name = self.output_name
+ end
+ if self.output_ttl > 0
+ trash_at = db_current_time + self.output_ttl
+ end
end
manifest = Collection.unscoped do
Collection.where(portable_data_hash: pdh).first.manifest_text
end
- begin
- coll = Collection.create!(owner_uuid: owner_uuid,
- manifest_text: manifest,
- portable_data_hash: pdh,
- name: coll_name,
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
- rescue ActiveRecord::RecordNotUnique => rn
- # In case this is executed as part of a transaction: When a Postgres exception happens,
- # the following statements on the same transaction become invalid, so a rollback is
- # needed. One example are Unit Tests, every test is enclosed inside a transaction so
- # that the database can be reverted before every new test starts.
- # See: http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html#module-ActiveRecord::Transactions::ClassMethods-label-Exception+handling+and+rolling+back
- ActiveRecord::Base.connection.execute 'ROLLBACK'
- raise unless out_type == 'output' and self.output_name
- # Postgres specific unique name check. See ApplicationController#create for
- # a detailed explanation.
- raise unless rn.original_exception.is_a? PG::UniqueViolation
- err = rn.original_exception
- detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
- raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
- # Output collection name collision detected: append a timestamp.
- coll_name = "#{self.output_name} #{Time.now.getgm.strftime('%FT%TZ')}"
- retry
- end
+
+ coll = Collection.new(owner_uuid: owner_uuid,
+ manifest_text: manifest,
+ portable_data_hash: pdh,
+ name: coll_name,
+ trash_at: trash_at,
+ delete_at: trash_at,
+ properties: {
+ 'type' => out_type,
+ 'container_request' => uuid,
+ })
+ coll.save_with_unique_name!
if out_type == 'output'
out_coll = coll.uuid
else
self.cwd ||= "."
self.container_count_max ||= Rails.configuration.container_count_max
self.scheduling_parameters ||= {}
- end
-
- # Create a new container (or find an existing one) to satisfy this
- # request.
- def resolve
- c_mounts = mounts_for_container
- c_runtime_constraints = runtime_constraints_for_container
- c_container_image = container_image_for_container
- c = act_as_system_user do
- c_attrs = {command: self.command,
- cwd: self.cwd,
- environment: self.environment,
- output_path: self.output_path,
- container_image: c_container_image,
- mounts: c_mounts,
- runtime_constraints: c_runtime_constraints}
-
- reusable = self.use_existing ? Container.find_reusable(c_attrs) : nil
- if not reusable.nil?
- reusable
- else
- c_attrs[:scheduling_parameters] = self.scheduling_parameters
- Container.create!(c_attrs)
- end
- end
- self.container_uuid = c.uuid
- end
-
- # Return a runtime_constraints hash that complies with
- # self.runtime_constraints but is suitable for saving in a container
- # record, i.e., has specific values instead of ranges.
- #
- # Doing this as a step separate from other resolutions, like "git
- # revision range to commit hash", makes sense only when there is no
- # opportunity to reuse an existing container (e.g., container reuse
- # is not implemented yet, or we have already found that no existing
- # containers are suitable).
- def runtime_constraints_for_container
- rc = {}
- runtime_constraints.each do |k, v|
- if v.is_a? Array
- rc[k] = v[0]
- else
- rc[k] = v
- end
- end
- rc
- end
-
- # Return a mounts hash suitable for a Container, i.e., with every
- # readonly collection UUID resolved to a PDH.
- def mounts_for_container
- c_mounts = {}
- mounts.each do |k, mount|
- mount = mount.dup
- c_mounts[k] = mount
- if mount['kind'] != 'collection'
- next
- end
- if (uuid = mount.delete 'uuid')
- c = Collection.
- readable_by(current_user).
- where(uuid: uuid).
- select(:portable_data_hash).
- first
- if !c
- raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
- end
- if mount['portable_data_hash'].nil?
- # PDH not supplied by client
- mount['portable_data_hash'] = c.portable_data_hash
- elsif mount['portable_data_hash'] != c.portable_data_hash
- # UUID and PDH supplied by client, but they don't agree
- raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
- end
- end
- end
- return c_mounts
- end
-
- # Return a container_image PDH suitable for a Container.
- def container_image_for_container
- coll = Collection.for_latest_docker_image(container_image)
- if !coll
- raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
- end
- coll.portable_data_hash
+ self.output_ttl ||= 0
end
def set_container
return false
end
if state_changed? and state == Committed and container_uuid.nil?
- resolve
+ self.container_uuid = Container.resolve(self).uuid
end
if self.container_uuid != self.container_uuid_was
if self.container_count_changed?
def validate_runtime_constraints
case self.state
when Committed
- ['vcpus', 'ram'].each do |k|
- if not (runtime_constraints.include? k and
- runtime_constraints[k].is_a? Integer and
- runtime_constraints[k] > 0)
- errors.add :runtime_constraints, "#{k} must be a positive integer"
+ [['vcpus', true],
+ ['ram', true],
+ ['keep_cache_ram', false]].each do |k, required|
+ if !required && !runtime_constraints.include?(k)
+ next
+ end
+ v = runtime_constraints[k]
+ unless (v.is_a?(Integer) && v > 0)
+ errors.add(:runtime_constraints,
+ "[#{k}]=#{v.inspect} must be a positive integer")
end
- end
-
- if runtime_constraints.include? 'keep_cache_ram' and
- (!runtime_constraints['keep_cache_ram'].is_a?(Integer) or
- runtime_constraints['keep_cache_ram'] <= 0)
- errors.add :runtime_constraints, "keep_cache_ram must be a positive integer"
- elsif !runtime_constraints.include? 'keep_cache_ram'
- runtime_constraints['keep_cache_ram'] = Rails.configuration.container_default_keep_cache_ram
end
end
end
end
end
- def validate_change
- permitted = [:owner_uuid]
+ def check_update_whitelist
+ permitted = AttrsPermittedAlways.dup
- case self.state
- when Uncommitted
- # Permit updating most fields
- permitted.push :command, :container_count_max,
- :container_image, :cwd, :description, :environment,
- :filters, :mounts, :name, :output_path, :priority,
- :properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing, :scheduling_parameters,
- :output_name
+ if self.new_record? || self.state_was == Uncommitted
+ # Allow create-and-commit in a single operation.
+ permitted.push *AttrsPermittedBeforeCommit
+ end
+ case self.state
when Committed
- if container_uuid.nil?
- errors.add :container_uuid, "has not been resolved to a container."
- end
+ permitted.push :priority, :container_count_max, :container_uuid
- if priority.nil?
- errors.add :priority, "cannot be nil"
+ if self.container_uuid.nil?
+ self.errors.add :container_uuid, "has not been resolved to a container."
end
- # Can update priority, container count, name and description
- permitted.push :priority, :container_count, :container_count_max, :container_uuid,
- :name, :description
+ if self.priority.nil?
+ self.errors.add :priority, "cannot be nil"
+ end
- if self.state_changed?
- # Allow create-and-commit in a single operation.
- permitted.push :command, :container_image, :cwd, :description, :environment,
- :filters, :mounts, :name, :output_path, :properties,
- :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing, :scheduling_parameters,
- :output_name
+ # Allow container count to increment by 1
+ if (self.container_uuid &&
+ self.container_uuid != self.container_uuid_was &&
+ self.container_count == 1 + (self.container_count_was || 0))
+ permitted.push :container_count
end
when Final
- if not current_user.andand.is_admin and not (self.name_changed? || self.description_changed?)
- errors.add :state, "of container request can only be set to Final by system."
+ if self.state_changed? and not current_user.andand.is_admin
+ self.errors.add :state, "of container request can only be set to Final by system."
end
- if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
- permitted.push :state, :name, :description, :output_uuid, :log_uuid
- else
- errors.add :state, "does not allow updates"
+ if self.state_was == Committed
+ permitted.push :output_uuid, :log_uuid
end
- else
- errors.add :state, "invalid value"
end
- check_update_whitelist permitted
+ super(permitted)
end
def update_priority
+require 'audit_logs'
+
class Log < ArvadosModel
include HasUuid
include KindAndEtag
serialize :properties, Hash
before_validation :set_default_event_at
after_save :send_notify
+ after_commit { AuditLogs.tidy_in_background }
api_accessible :user, extend: :common do |t|
t.add :id
def send_notify
connection.execute "NOTIFY logs, '#{self.id}'"
end
-
end
# crunchstat logs from the logs table.
clean_container_log_rows_after: <%= 30.days %>
+ # Time to keep audit logs, in seconds. (An audit log is a row added
+ # to the "logs" table in the PostgreSQL database each time an
+ # Arvados object is created, modified, or deleted.)
+ #
+ # Currently, websocket event notifications rely on audit logs, so
+ # this should not be set lower than 600 (5 minutes).
+ max_audit_log_age: 1209600
+
+ # Maximum number of log rows to delete in a single SQL transaction.
+ #
+ # If max_audit_log_delete_batch is 0, log entries will never be
+ # deleted by Arvados. Cleanup can be done by an external process
+ # without affecting any Arvados system processes, as long as very
+ # recent (<5 minutes old) logs are not deleted.
+ #
+ # 100000 is a reasonable batch size for most sites.
+ max_audit_log_delete_batch: 0
+
# The maximum number of compute nodes that can be in use simultaneously
# If this limit is reduced, any existing nodes with slot number >= new limit
# will not be counted against the new limit. In other words, the new limit
--- /dev/null
+class AddPortableDataHashIndexToCollections < ActiveRecord::Migration
+ def change
+ add_index :collections, :portable_data_hash
+ end
+end
--- /dev/null
+class AddOutputTtlToContainerRequests < ActiveRecord::Migration
+ def change
+ add_column :container_requests, :output_ttl, :integer, default: 0, null: false
+ end
+end
scheduling_parameters text,
output_uuid character varying(255),
log_uuid character varying(255),
- output_name character varying(255) DEFAULT NULL::character varying
+ output_name character varying(255) DEFAULT NULL::character varying,
+ output_ttl integer DEFAULT 0 NOT NULL
);
CREATE UNIQUE INDEX index_collections_on_owner_uuid_and_name ON collections USING btree (owner_uuid, name) WHERE (is_trashed = false);
+--
+-- Name: index_collections_on_portable_data_hash; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE INDEX index_collections_on_portable_data_hash ON collections USING btree (portable_data_hash);
+
+
--
-- Name: index_collections_on_trash_at; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
INSERT INTO schema_migrations (version) VALUES ('20170216170823');
-INSERT INTO schema_migrations (version) VALUES ('20170301225558');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20170301225558');
+
+INSERT INTO schema_migrations (version) VALUES ('20170328215436');
+
+INSERT INTO schema_migrations (version) VALUES ('20170330012505');
\ No newline at end of file
--- /dev/null
+require 'current_api_client'
+require 'db_current_time'
+
+module AuditLogs
+ extend CurrentApiClient
+ extend DbCurrentTime
+
+ def self.delete_old(max_age:, max_batch:)
+ act_as_system_user do
+ if !File.owned?(Rails.root.join('tmp'))
+ Rails.logger.warn("AuditLogs: not owner of #{Rails.root}/tmp, skipping")
+ return
+ end
+ lockfile = Rails.root.join('tmp', 'audit_logs.lock')
+ File.open(lockfile, File::RDWR|File::CREAT, 0600) do |f|
+ return unless f.flock(File::LOCK_NB|File::LOCK_EX)
+
+ sql = "select clock_timestamp() - interval '#{'%.9f' % max_age} seconds'"
+ threshold = ActiveRecord::Base.connection.select_value(sql).to_time.utc
+ Rails.logger.info "AuditLogs: deleting logs older than #{threshold}"
+
+ did_total = 0
+ loop do
+ sql = Log.unscoped.
+ select(:id).
+ order(:created_at).
+ where('event_type in (?)', ['create', 'update', 'destroy', 'delete']).
+ where('created_at < ?', threshold).
+ limit(max_batch).
+ to_sql
+ did = Log.unscoped.where("id in (#{sql})").delete_all
+ did_total += did
+
+ Rails.logger.info "AuditLogs: deleted batch of #{did}"
+ break if did == 0
+ end
+ Rails.logger.info "AuditLogs: deleted total #{did_total}"
+ end
+ end
+ end
+
+ def self.tidy_in_background
+ max_age = Rails.configuration.max_audit_log_age
+ max_batch = Rails.configuration.max_audit_log_delete_batch
+ return if max_age <= 0 || max_batch <= 0
+
+ exp = (max_age/14).seconds
+ need = false
+ Rails.cache.fetch('AuditLogs', expires_in: exp) do
+ need = true
+ end
+ return if !need
+
+ Thread.new do
+ Thread.current.abort_on_exception = false
+ begin
+ delete_old(max_age: max_age, max_batch: max_batch)
+ rescue => e
+ Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
+ ensure
+ ActiveRecord::Base.connection.close
+ end
+ end
+ end
+end
jobrecord = Job.find_by_uuid(job_done.uuid)
if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure")
# Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
@job_retry_counts[jobrecord.uuid] += 1
exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+ do_what_next = "give up now"
if exit_tempfail
@todo_job_retries[jobrecord.uuid] = jobrecord
- else
- $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+ do_what_next = "re-attempt"
end
+ $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " +
+ "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}")
end
if !exit_tempfail
# The following four collections are used to test combining collections with repeated filenames
collection_with_repeated_filenames_and_contents_in_two_dirs_1:
uuid: zzzzz-4zz18-duplicatenames1
- portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ portable_data_hash: f3a67fad3a19c31c658982fb8158fa58+144
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
created_at: 2014-02-03T17:22:54Z
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
collection_with_repeated_filenames_and_contents_in_two_dirs_2:
uuid: zzzzz-4zz18-duplicatenames2
- portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ portable_data_hash: f3a67fad3a19c31c658982fb8158fa58+144
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
created_at: 2014-02-03T17:22:54Z
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
name: collection_not_readable_by_active
+collection_to_remove_and_rename_files:
+ uuid: zzzzz-4zz18-a21ux3541sxa8sf
+ portable_data_hash: 80cf6dd2cf079dd13f272ec4245cb4a8+48
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-02-03T17:22:54Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ modified_at: 2014-02-03T17:22:54Z
+ updated_at: 2014-02-03T17:22:54Z
+ manifest_text: ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1 0:0:file2\n"
+ name: collection to remove and rename files
+
# Test Helper trims the rest of the file
object_uuid: zzzzz-2x53u-382brsig8rp3667 # repository foo
object_owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
event_at: <%= 2.minute.ago.to_s(:db) %>
+ event_type: update
admin_changes_specimen: # admin changes specimen owned_by_spectator
id: 3
object_uuid: zzzzz-2x53u-3b0xxwzlbzxq5yr # specimen owned_by_spectator
object_owner_uuid: zzzzz-tpzed-l1s2piq4t4mps8r # spectator user
event_at: <%= 3.minute.ago.to_s(:db) %>
+ event_type: update
system_adds_foo_file: # foo collection added, readable by active through link
id: 4
object_uuid: zzzzz-4zz18-znfnqtbbv4spc3w # foo file
object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
event_at: <%= 4.minute.ago.to_s(:db) %>
+ event_type: create
system_adds_baz: # baz collection added, readable by active and spectator through group 'all users' group membership
id: 5
object_uuid: zzzzz-4zz18-y9vne9npefyxh8g # baz file
object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
event_at: <%= 5.minute.ago.to_s(:db) %>
+ event_type: create
log_owned_by_active:
id: 6
authorize_with :inactive
get :index
assert_response :success
- node_items = JSON.parse(@response.body)['items']
- assert_equal 0, node_items.size
+ assert_equal 0, json_response['items'].size
+ assert_equal 0, json_response['items_available']
end
# active user sees non-secret attributes of up and recently-up nodes
authorize_with :active
get :index
assert_response :success
- node_items = JSON.parse(@response.body)['items']
- assert_not_equal 0, node_items.size
+ assert_operator 0, :<, json_response['items_available']
+ node_items = json_response['items']
+ assert_operator 0, :<, node_items.size
found_busy_node = false
node_items.each do |node|
assert_nil node['info'].andand['ping_secret']
authorize_with user
get :index, {select: ['domain']}
assert_response :success
+ assert_operator 0, :<, json_response['items_available']
end
end
include UsersTestHelper
setup do
- @all_links_at_start = Link.all
+ @initial_link_count = Link.count
@vm_uuid = virtual_machines(:testvm).uuid
end
assert_nil created['identity_url'], 'expected no identity_url'
# arvados#user, repo link and link add user to 'All users' group
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# four extra links; system_group, login, group and repo perms
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
end
test "setup user with fake vm and expect error" do
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# five extra links; system_group, login, group, vm, repo
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
end
test "setup user with valid email, no vm and no repo as input" do
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# three extra links; system_group, login, and group
- verify_num_links @all_links_at_start, 3
+ verify_links_added 3
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
response_object['uuid'], response_object['email'], 'arvados#user', false, 'User'
'expecting first name'
# five extra links; system_group, login, group, repo and vm
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
end
test "setup user with an existing user email and check different object is created" do
'expected different uuid after create operation'
assert_equal inactive_user['email'], response_object['email'], 'expected given email'
# system_group, openid, group, and repo. No vm link.
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
end
test "setup user with openid prefix" do
# verify links
# four new links: system_group, arvados#user, repo, and 'All users' group.
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
# five new links: system_group, arvados#user, repo, vm and 'All
# users' group link
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
"admin's filtered index did not return inactive user")
end
- def verify_num_links (original_links, expected_additional_links)
- assert_equal expected_additional_links, Link.all.size-original_links.size,
- "Expected #{expected_additional_links.inspect} more links"
+ def verify_links_added more
+ assert_equal @initial_link_count+more, Link.count,
+ "Started with #{@initial_link_count} links, expected #{more} more"
end
def find_obj_in_resp (response_items, object_type, head_kind=nil)
end
end
+ now = Time.now
[['trash-to-delete interval negative',
:collection_owned_by_active,
- {trash_at: Time.now+2.weeks, delete_at: Time.now},
+ {trash_at: now+2.weeks, delete_at: now},
{state: :invalid}],
- ['trash-to-delete interval too short',
+ ['now-to-delete interval short',
:collection_owned_by_active,
- {trash_at: Time.now+3.days, delete_at: Time.now+7.days},
- {state: :invalid}],
+ {trash_at: now+3.days, delete_at: now+7.days},
+ {state: :trash_future}],
+ ['now-to-delete interval short, trash=delete',
+ :collection_owned_by_active,
+ {trash_at: now+3.days, delete_at: now+3.days},
+ {state: :trash_future}],
['trash-to-delete interval ok',
:collection_owned_by_active,
- {trash_at: Time.now, delete_at: Time.now+15.days},
+ {trash_at: now, delete_at: now+15.days},
{state: :trash_now}],
['trash-to-delete interval short, but far enough in future',
:collection_owned_by_active,
- {trash_at: Time.now+13.days, delete_at: Time.now+15.days},
+ {trash_at: now+13.days, delete_at: now+15.days},
{state: :trash_future}],
['trash by setting is_trashed bool',
:collection_owned_by_active,
{state: :trash_now}],
['trash in future by setting just trash_at',
:collection_owned_by_active,
- {trash_at: Time.now+1.week},
+ {trash_at: now+1.week},
{state: :trash_future}],
['trash in future by setting trash_at and delete_at',
:collection_owned_by_active,
- {trash_at: Time.now+1.week, delete_at: Time.now+4.weeks},
+ {trash_at: now+1.week, delete_at: now+4.weeks},
{state: :trash_future}],
['untrash by clearing is_trashed bool',
:expired_collection,
end
updates_ok = c.update_attributes(updates)
expect_valid = expect[:state] != :invalid
- assert_equal updates_ok, expect_valid, c.errors.full_messages.to_s
+ assert_equal expect_valid, updates_ok, c.errors.full_messages.to_s
case expect[:state]
when :invalid
refute c.valid?
class ContainerRequestTest < ActiveSupport::TestCase
include DockerMigrationHelper
+ include DbCurrentTime
def create_minimal_req! attrs={}
defaults = {
cr.reload
+ assert_equal({"vcpus" => 2, "ram" => 30}, cr.runtime_constraints)
+
assert_not_nil cr.container_uuid
c = Container.find_by_uuid cr.container_uuid
assert_not_nil c
lambda { |resolved| resolved["ram"] == 1234234234 }],
].each do |rc, okfunc|
test "resolve runtime constraint range #{rc} to values" do
- cr = ContainerRequest.new(runtime_constraints: rc)
- resolved = cr.send :runtime_constraints_for_container
+ resolved = Container.resolve_runtime_constraints(rc)
assert(okfunc.call(resolved),
"container runtime_constraints was #{resolved.inspect}")
end
].each do |mounts, okfunc|
test "resolve mounts #{mounts.inspect} to values" do
set_user_from_auth :active
- cr = ContainerRequest.new(mounts: mounts)
- resolved = cr.send :mounts_for_container
+ resolved = Container.resolve_mounts(mounts)
assert(okfunc.call(resolved),
- "mounts_for_container returned #{resolved.inspect}")
+ "Container.resolve_mounts returned #{resolved.inspect}")
end
end
"path" => "/foo",
},
}
- cr = ContainerRequest.new(mounts: m)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send :mounts_for_container
+ Container.resolve_mounts(m)
end
end
"path" => "/foo",
},
}
- cr = ContainerRequest.new(mounts: m)
assert_raises(ArgumentError) do
- cr.send :mounts_for_container
+ Container.resolve_mounts(m)
end
end
'arvados/apitestfixture',
'd8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678',
].each do |tag|
- test "container_image_for_container(#{tag.inspect})" do
+ test "Container.resolve_container_image(#{tag.inspect})" do
set_user_from_auth :active
- cr = ContainerRequest.new(container_image: tag)
- resolved = cr.send :container_image_for_container
+ resolved = Container.resolve_container_image(tag)
assert_equal resolved, collections(:docker_image).portable_data_hash
end
end
- test "container_image_for_container(pdh)" do
+ test "Container.resolve_container_image(pdh)" do
set_user_from_auth :active
[[:docker_image, 'v1'], [:docker_image_1_12, 'v2']].each do |coll, ver|
Rails.configuration.docker_image_formats = [ver]
pdh = collections(coll).portable_data_hash
- cr = ContainerRequest.new(container_image: pdh)
- resolved = cr.send :container_image_for_container
+ resolved = Container.resolve_container_image(pdh)
assert_equal resolved, pdh
end
end
].each do |img|
test "container_image_for_container(#{img.inspect}) => 422" do
set_user_from_auth :active
- cr = ContainerRequest.new(container_image: img)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send :container_image_for_container
+ Container.resolve_container_image(img)
end
end
end
set_user_from_auth :active
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image_1_12).portable_data_hash)
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image_1_12).portable_data_hash)
end
set_user_from_auth :active
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image).portable_data_hash)
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image).portable_data_hash)
end
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image_1_12).portable_data_hash)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
end
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
end
command: ["echo", "hello"],
output_path: "test",
runtime_constraints: {"vcpus" => 4,
- "ram" => 12000000000,
- "keep_cache_ram" => 268435456},
+ "ram" => 12000000000},
mounts: {"test" => {"kind" => "json"}}}
set_user_from_auth :active
cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
test "Output collection name setting using output_name with name collision resolution" do
set_user_from_auth :active
- output_name = collections(:foo_file).name
+ output_name = 'unimaginative name'
+ Collection.create!(name: output_name)
cr = create_minimal_req!(priority: 1,
state: ContainerRequest::Committed,
output_name: output_name)
- act_as_system_user do
- c = Container.find_by_uuid(cr.container_uuid)
- c.update_attributes!(state: Container::Locked)
- c.update_attributes!(state: Container::Running)
- c.update_attributes!(state: Container::Complete,
- exit_code: 0,
- output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
- log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
- end
- cr.save
+ run_container(cr)
+ cr.reload
assert_equal ContainerRequest::Final, cr.state
output_coll = Collection.find_by_uuid(cr.output_uuid)
# Make sure the resulting output collection name include the original name
# plus the date
assert_not_equal output_name, output_coll.name,
- "It shouldn't exist more than one collection with the same owner and name '${output_name}'"
+ "more than one collection with the same owner and name"
assert output_coll.name.include?(output_name),
"New name should include original name"
- assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z/, output_coll.name,
+ assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/, output_coll.name,
"New name should include ISO8601 date"
end
- test "Finalize committed request when reusing a finished container" do
- set_user_from_auth :active
- cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
- cr.reload
- assert_equal ContainerRequest::Committed, cr.state
+ [[0, :check_output_ttl_0],
+ [1, :check_output_ttl_1s],
+ [365*86400, :check_output_ttl_1y],
+ ].each do |ttl, checker|
+ test "output_ttl=#{ttl}" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: ContainerRequest::Committed,
+ output_name: 'foo',
+ output_ttl: ttl)
+ run_container(cr)
+ cr.reload
+ output = Collection.find_by_uuid(cr.output_uuid)
+ send(checker, db_current_time, output.trash_at, output.delete_at)
+ end
+ end
+ end
+
+ def check_output_ttl_0(now, trash, delete)
+ assert_nil(trash)
+ assert_nil(delete)
+ end
+
+ def check_output_ttl_1s(now, trash, delete)
+ assert_not_nil(trash)
+ assert_not_nil(delete)
+ assert_in_delta(trash, now + 1.second, 10)
+ assert_in_delta(delete, now + Rails.configuration.blob_signature_ttl.second, 10)
+ end
+
+ def check_output_ttl_1y(now, trash, delete)
+ year = (86400*365).second
+ assert_not_nil(trash)
+ assert_not_nil(delete)
+ assert_in_delta(trash, now + year, 10)
+ assert_in_delta(delete, now + year, 10)
+ end
+
+ def run_container(cr)
act_as_system_user do
c = Container.find_by_uuid(cr.container_uuid)
c.update_attributes!(state: Container::Locked)
exit_code: 0,
output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ c
end
+ end
+
+ test "Finalize committed request when reusing a finished container" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ cr.reload
+ assert_equal ContainerRequest::Committed, cr.state
+ run_container(cr)
cr.reload
assert_equal ContainerRequest::Final, cr.state
assert_equal ContainerRequest::Final, cr3.state
end
- [
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => 100}, ContainerRequest::Committed, 100],
- [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Uncommitted],
- [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Committed],
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => '123'}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
- ].each do |rc, state, expected|
- test "create container request with #{rc} in state #{state} and verify keep_cache_ram #{expected}" do
- common_attrs = {cwd: "test",
- priority: 1,
- command: ["echo", "hello"],
- output_path: "test",
- runtime_constraints: rc,
- mounts: {"test" => {"kind" => "json"}}}
- set_user_from_auth :active
-
- if expected == ActiveRecord::RecordInvalid
- assert_raises(ActiveRecord::RecordInvalid) do
- create_minimal_req!(common_attrs.merge({state: state}))
- end
- else
- cr = create_minimal_req!(common_attrs.merge({state: state}))
- expected = Rails.configuration.container_default_keep_cache_ram if state == ContainerRequest::Committed and expected.nil?
- assert_equal expected, cr.runtime_constraints['keep_cache_ram']
- end
- end
- end
-
[
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
end
end
end
+
+ [['Committed', true, {name: "foobar", priority: 123}],
+ ['Committed', false, {container_count: 2}],
+ ['Committed', false, {container_count: 0}],
+ ['Committed', false, {container_count: nil}],
+ ['Final', false, {state: ContainerRequest::Committed, name: "foobar"}],
+ ['Final', false, {name: "foobar", priority: 123}],
+ ['Final', false, {name: "foobar", output_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {name: "foobar", log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {priority: 123}],
+ ['Final', false, {mounts: {}}],
+ ['Final', false, {container_count: 2}],
+ ['Final', true, {name: "foobar"}],
+ ['Final', true, {name: "foobar", description: "baz"}],
+ ].each do |state, permitted, updates|
+ test "state=#{state} can#{'not' if !permitted} update #{updates.inspect}" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: "Committed",
+ container_count_max: 1)
+ case state
+ when 'Committed'
+ # already done
+ when 'Final'
+ act_as_system_user do
+ Container.find_by_uuid(cr.container_uuid).
+ update_attributes!(state: Container::Cancelled)
+ end
+ cr.reload
+ else
+ raise 'broken test case'
+ end
+ assert_equal state, cr.state
+ if permitted
+ assert cr.update_attributes!(updates)
+ else
+ assert_raises(ActiveRecord::RecordInvalid) do
+ cr.update_attributes!(updates)
+ end
+ end
+ end
+ end
+ end
end
runtime_constraints: {"vcpus" => 1, "ram" => 1},
}
- REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
- cwd: "test",
- command: ["echo", "hello"],
- output_path: "test",
- runtime_constraints: {"vcpus" => 4,
- "ram" => 12000000000},
- mounts: {"test" => {"kind" => "json"}},
- environment: {"var" => 'val'}}
+ REUSABLE_COMMON_ATTRS = {
+ container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
+ cwd: "test",
+ command: ["echo", "hello"],
+ output_path: "test",
+ runtime_constraints: {
+ "ram" => 12000000000,
+ "vcpus" => 4,
+ },
+ mounts: {
+ "test" => {"kind" => "json"},
+ },
+ environment: {
+ "var" => "val",
+ },
+ }
def minimal_new attrs={}
cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
test "Container serialized hash attributes sorted before save" do
env = {"C" => 3, "B" => 2, "A" => 1}
m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
- rc = {"vcpus" => 1, "ram" => 1}
+ rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1}
c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
assert_equal c.mounts.to_json, Container.deep_sort_hash(m).to_json
log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
}
- set_user_from_auth :dispatch1
-
- c_output1 = Container.create common_attrs
- c_output2 = Container.create common_attrs
- assert_not_equal c_output1.uuid, c_output2.uuid
-
cr = ContainerRequest.new common_attrs
+ cr.use_existing = false
cr.state = ContainerRequest::Committed
- cr.container_uuid = c_output1.uuid
cr.save!
+ c_output1 = Container.where(uuid: cr.container_uuid).first
cr = ContainerRequest.new common_attrs
+ cr.use_existing = false
cr.state = ContainerRequest::Committed
- cr.container_uuid = c_output2.uuid
cr.save!
+ c_output2 = Container.where(uuid: cr.container_uuid).first
+
+ assert_not_equal c_output1.uuid, c_output2.uuid
+
+ set_user_from_auth :dispatch1
out1 = '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'
log1 = collections(:real_log_collection).portable_data_hash
c_output2.update_attributes!({state: Container::Running})
c_output2.update_attributes!(completed_attrs.merge({log: log1, output: out2}))
- reused = Container.find_reusable(common_attrs)
- assert_not_nil reused
- assert_equal reused.uuid, c_output1.uuid
+ reused = Container.resolve(ContainerRequest.new(common_attrs))
+ assert_equal c_output1.uuid, reused.uuid
end
test "find_reusable method should select running container by start date" do
require 'test_helper'
+require 'audit_logs'
class LogTest < ActiveSupport::TestCase
include CurrentApiClient
end
end
end
+
+ def assert_no_logs_deleted
+ logs_before = Log.unscoped.all.count
+ yield
+ assert_equal logs_before, Log.unscoped.all.count
+ end
+
+ def remaining_audit_logs
+ Log.unscoped.where('event_type in (?)', %w(create update destroy delete))
+ end
+
+ # Default settings should not delete anything -- some sites rely on
+ # the original "keep everything forever" behavior.
+ test 'retain old audit logs with default settings' do
+ assert_no_logs_deleted do
+ AuditLogs.delete_old(
+ max_age: Rails.configuration.max_audit_log_age,
+ max_batch: Rails.configuration.max_audit_log_delete_batch)
+ end
+ end
+
+ # Batch size 0 should retain all logs -- even if max_age is very
+ # short, and even if the default settings (and associated test) have
+ # changed.
+ test 'retain old audit logs with max_audit_log_delete_batch=0' do
+ assert_no_logs_deleted do
+ AuditLogs.delete_old(max_age: 1, max_batch: 0)
+ end
+ end
+
+ # We recommend a more conservative age of 5 minutes for production,
+ # but 3 minutes suits our test data better (and is test-worthy in
+ # that it's expected to work correctly in production).
+ test 'delete old audit logs with production settings' do
+ initial_log_count = Log.unscoped.all.count
+ AuditLogs.delete_old(max_age: 180, max_batch: 100000)
+ assert_operator remaining_audit_logs.count, :<, initial_log_count
+ end
+
+ test 'delete all audit logs in multiple batches' do
+ AuditLogs.delete_old(max_age: 0.00001, max_batch: 2)
+ assert_equal [], remaining_audit_logs.collect(&:uuid)
+ end
+
+ test 'delete old audit logs in thread' do
+ begin
+ Rails.configuration.max_audit_log_age = 20
+ Rails.configuration.max_audit_log_delete_batch = 100000
+ Rails.cache.delete 'AuditLogs'
+ initial_log_count = Log.unscoped.all.count + 1
+ act_as_system_user do
+ Log.create!()
+ initial_log_count += 1
+ end
+ deadline = Time.now + 10
+ while remaining_audit_logs.count == initial_log_count
+ if Time.now > deadline
+ raise "timed out"
+ end
+ sleep 0.1
+ end
+ assert_operator remaining_audit_logs.count, :<, initial_log_count
+ ensure
+ # The test framework rolls back our transactions, but that
+ # doesn't undo the deletes we did from separate threads.
+ ActiveRecord::Base.connection.exec_query 'ROLLBACK'
+ Thread.new do
+ begin
+ dc = DatabaseController.new
+ dc.define_singleton_method :render do |*args| end
+ dc.reset
+ ensure
+ ActiveRecord::Base.connection.close
+ end
+ end.join
+ end
+ end
end
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/git-httpd/git-httpd.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/arvados-git-httpd
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/crunch-dispatch-slurm
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target
package main
import (
+ "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
+
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
+ dockerclient "github.com/docker/docker/client"
)
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
// ThinDockerClient is the minimal Docker client interface used by crunch-run.
type ThinDockerClient interface {
- StopContainer(id string, timeout int) error
- InspectImage(id string) (*dockerclient.ImageInfo, error)
- LoadImage(reader io.Reader) error
- CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
- StartContainer(id string, config *dockerclient.HostConfig) error
- AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
- Wait(id string) <-chan dockerclient.WaitResult
- RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+ ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+ ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+ ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+ ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerWait(ctx context.Context, container string) (int64, error)
+ ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+ ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+ ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+ Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+ return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+ return proxy.Docker.ImageRemove(ctx, image, options)
}
// ContainerRunner is the main stateful struct used for a single execution of a
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
- dockerclient.ContainerConfig
- dockerclient.HostConfig
+ ContainerConfig dockercontainer.Config
+ dockercontainer.HostConfig
token string
ContainerID string
ExitCode *int
cStateLock sync.Mutex
cStarted bool // StartContainer() succeeded
cCancelled bool // StopContainer() invoked
+
+ enableNetwork string // one of "default" or "always"
+ networkMode string // passed through to HostConfig.NetworkMode
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
}
runner.cCancelled = true
if runner.cStarted {
- err := runner.Docker.StopContainer(runner.ContainerID, 10)
+ timeout := time.Duration(10)
+ err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
if err != nil {
log.Printf("StopContainer failed: %s", err)
}
runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, err = runner.Docker.InspectImage(imageID)
+ _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
if err != nil {
runner.CrunchLog.Print("Loading Docker image from keep")
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- err = runner.Docker.LoadImage(readCloser)
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
+ response.Body.Close()
} else {
runner.CrunchLog.Print("Docker image is available")
}
runner.CrunchLog.Print("Attaching container streams")
- var containerReader io.Reader
- containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
- &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+ response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
}
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go runner.ProcessDockerAttach(containerReader)
+ go runner.ProcessDockerAttach(response.Reader)
return nil
}
for k, v := range runner.Container.Environment {
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+
+ runner.HostConfig = dockercontainer.HostConfig{
+ Binds: runner.Binds,
+ Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ LogConfig: dockercontainer.LogConfig{
+ Type: "none",
+ },
+ }
+
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
tok, err := runner.ContainerToken()
if err != nil {
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
- runner.ContainerConfig.NetworkDisabled = false
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
- runner.ContainerConfig.NetworkDisabled = true
+ if runner.enableNetwork == "always" {
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+ } else {
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
+ }
}
- var err error
- runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+ createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
- runner.HostConfig = dockerclient.HostConfig{
- Binds: runner.Binds,
- CgroupParent: runner.setCgroupParent,
- LogConfig: dockerclient.LogConfig{
- Type: "none",
- },
- }
+ runner.ContainerID = createdBody.ID
return runner.AttachStreams()
}
if runner.cCancelled {
return ErrCancelled
}
- err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+ err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerStartOptions{})
if err != nil {
return fmt.Errorf("could not start container: %v", err)
}
func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker := runner.Docker.Wait(runner.ContainerID)
+ waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ if err != nil {
+ return fmt.Errorf("container wait: %v", err)
+ }
+
+ runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+ code := int(waitDocker)
+ runner.ExitCode = &code
+
waitMount := runner.ArvMountExit
- for waitDocker != nil {
- select {
- case err := <-waitMount:
- runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
- waitMount = nil
- runner.stop()
- case wr := <-waitDocker:
- if wr.Error != nil {
- return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
- }
- runner.ExitCode = &wr.ExitCode
- waitDocker = nil
- }
+ select {
+ case err := <-waitMount:
+ runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+ waitMount = nil
+ runner.stop()
+ default:
}
// wait for stdout/stderr to complete
cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+ enableNetwork := flag.String("container-enable-networking", "default",
+ `Specify if networking should be enabled for container. One of 'default', 'always':
+ default: only enable networking if container requests it.
+ always: containers always have networking enabled
+ `)
+ networkMode := flag.String("container-network-mode", "default",
+ `Set networking mode for container. Corresponds to Docker network mode (--net).
+ `)
flag.Parse()
containerId := flag.Arg(0)
}
kc.Retries = 4
- var docker *dockerclient.DockerClient
- docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ var docker *dockerclient.Client
+ // API version 1.21 corresponds to Docker 1.9, which is currently the
+ // minimum version we want to support.
+ docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
if err != nil {
log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, containerId)
+ dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+ cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
+ cr.enableNetwork = *enableNetwork
+ cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
p := findCgroup(*cgroupParentSubsystem)
cr.setCgroupParent = p
package main
import (
+ "bufio"
"bytes"
+ "context"
"crypto/md5"
"encoding/json"
"errors"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
+
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
. "gopkg.in/check.v1"
)
logReader io.ReadCloser
logWriter io.WriteCloser
fn func(t *TestDockerClient)
- finish chan dockerclient.WaitResult
+ finish int
stop chan bool
cwd string
env []string
api *ArvTestClient
}
-func NewTestDockerClient() *TestDockerClient {
+func NewTestDockerClient(exitCode int) *TestDockerClient {
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
- t.finish = make(chan dockerclient.WaitResult)
+ t.finish = exitCode
t.stop = make(chan bool)
t.cwd = "/"
return t
}
-func (t *TestDockerClient) StopContainer(id string, timeout int) error {
- t.stop <- true
- return nil
+func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
}
-func (t *TestDockerClient) InspectImage(id string) (*dockerclient.ImageInfo, error) {
- if t.imageLoaded == id {
- return &dockerclient.ImageInfo{}, nil
- } else {
- return nil, errors.New("")
- }
-}
-
-func (t *TestDockerClient) LoadImage(reader io.Reader) error {
- _, err := io.Copy(ioutil.Discard, reader)
- if err != nil {
- return err
- } else {
- t.imageLoaded = hwImageId
- return nil
- }
-}
-
-func (t *TestDockerClient) CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) {
+func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
if config.WorkingDir != "" {
t.cwd = config.WorkingDir
}
t.env = config.Env
- return "abcde", nil
+ return dockercontainer.ContainerCreateCreatedBody{ID: "abcde"}, nil
}
-func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostConfig) error {
- if id == "abcde" {
+func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ if container == "abcde" {
go t.fn(t)
return nil
} else {
}
}
-func (t *TestDockerClient) AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) {
- return t.logReader, nil
+func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ t.stop <- true
+ return nil
+}
+
+func (t *TestDockerClient) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return int64(t.finish), nil
+}
+
+func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.imageLoaded == image {
+ return dockertypes.ImageInspect{}, nil, nil
+ } else {
+ return dockertypes.ImageInspect{}, nil, errors.New("")
+ }
}
-func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
- return t.finish
+func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ _, err := io.Copy(ioutil.Discard, input)
+ if err != nil {
+ return dockertypes.ImageLoadResponse{}, err
+ } else {
+ t.imageLoaded = hwImageId
+ return dockertypes.ImageLoadResponse{Body: ioutil.NopCloser(input)}, nil
+ }
}
-func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) {
+func (*TestDockerClient) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
return nil, nil
}
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- _, err := cr.Docker.RemoveImage(hwImageId, true)
+ _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
- _, err = cr.Docker.InspectImage(hwImageId)
+ _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
c.Check(err, NotNil)
cr.Container.ContainerImage = hwPDH
c.Check(err, IsNil)
defer func() {
- cr.Docker.RemoveImage(hwImageId, true)
+ cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
}()
c.Check(kc.Called, Equals, true)
c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
- _, err = cr.Docker.InspectImage(hwImageId)
+ _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
c.Check(err, IsNil)
// (2) Test using image that's already loaded
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
}
func (s *TestSuite) TestRunContainer(c *C) {
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "Hello world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
}
cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(exitCode)
docker.fn = fn
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
docker.api = api
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
+ }`, nil, 0, func(t *TestDockerClient) {
time.Sleep(time.Second)
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
- time.Sleep(time.Second)
- t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
- })
+ }`, nil, 0,
+ func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ })
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
- time.Sleep(time.Second)
- t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
- })
+ }`, nil, 0,
+ func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ })
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 1, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello\n"))
t.logWriter.Write(dockerLog(2, "world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
final := api.CalledWith("container.state", "Complete")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = func(t *TestDockerClient) {
<-t.stop
t.logWriter.Write(dockerLog(1, "foo\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
}
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api := &ArvTestClient{Container: rec}
cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = fn
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
}
- api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp",
"b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/docker-cleaner/docker-cleaner.json
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=simple
import arvados.commands._util as arv_cmd
from arvados_fuse import crunchstat
from arvados_fuse import *
+from arvados_fuse.unmount import unmount
from arvados_fuse._version import __version__
class ArgumentParser(argparse.ArgumentParser):
self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
+ unmount = self.add_mutually_exclusive_group()
+ unmount.add_argument('--unmount', action='store_true', default=False,
+ help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.")
+ unmount.add_argument('--unmount-all', action='store_true', default=False,
+ help="Forcefully unmount every fuse mount at or below the specified mountpoint and exit.")
+ unmount.add_argument('--replace', action='store_true', default=False,
+ help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
self.add_argument('--unmount-timeout',
type=float, default=2.0,
help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
class Mount(object):
def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
+ self.daemon = False
self.logger = logger
self.args = args
self.listen_for_events = False
exit(1)
def __enter__(self):
+ if self.args.replace:
+ unmount(path=self.args.mountpoint,
+ timeout=self.args.unmount_timeout)
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
+ if self.daemon:
+ daemon.DaemonContext(
+ working_directory=os.path.dirname(self.args.mountpoint),
+ files_preserve=range(
+ 3, resource.getrlimit(resource.RLIMIT_NOFILE)[1])
+ ).open()
if self.listen_for_events and not self.args.disable_event_listening:
self.operations.listen_for_events()
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.args.unmount_timeout)
def run(self):
- if self.args.exec_args:
+ if self.args.unmount or self.args.unmount_all:
+ unmount(path=self.args.mountpoint,
+ timeout=self.args.unmount_timeout,
+ recursive=self.args.unmount_all)
+ elif self.args.exec_args:
self._run_exec()
else:
self._run_standalone()
def _run_standalone(self):
try:
- llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
- if not self.args.foreground:
- self.daemon_ctx = daemon.DaemonContext(
- working_directory=os.path.dirname(self.args.mountpoint),
- files_preserve=range(
- 3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
- self.daemon_ctx.open()
-
- # Subscribe to change events from API server
- if self.listen_for_events and not self.args.disable_event_listening:
- self.operations.listen_for_events()
-
- self._llfuse_main()
+ self.daemon = not self.args.foreground
+ with self:
+ self.llfuse_thread.join(timeout=None)
except Exception as e:
self.logger.exception('arv-mount: exception during mount: %s', e)
exit(getattr(e, 'errno', 1))
--- /dev/null
+import collections
+import errno
+import os
+import subprocess
+import time
+
+
+MountInfo = collections.namedtuple(
+ 'MountInfo', ['is_fuse', 'major', 'minor', 'mnttype', 'path'])
+
+
+def mountinfo():
+ mi = []
+ with open('/proc/self/mountinfo') as f:
+ for m in f.readlines():
+ mntid, pmntid, dev, root, path, extra = m.split(" ", 5)
+ mnttype = extra.split(" - ")[1].split(" ", 1)[0]
+ major, minor = dev.split(":")
+ mi.append(MountInfo(
+ is_fuse=(mnttype == "fuse" or mnttype.startswith("fuse.")),
+ major=major,
+ minor=minor,
+ mnttype=mnttype,
+ path=path,
+ ))
+ return mi
+
+
+def unmount(path, timeout=10, recursive=False):
+ """Unmount the fuse mount at path.
+
+ Unmounting is done by writing 1 to the "abort" control file in
+ sysfs to kill the fuse driver process, then executing "fusermount
+ -u -z" to detach the mount point, and repeating these steps until
+ the mount is no longer listed in /proc/self/mountinfo.
+
+ This procedure should enable a non-root user to reliably unmount
+ their own fuse filesystem without risk of deadlock.
+
+ Returns True if unmounting was successful, False if it wasn't a
+ fuse mount at all. Raises an exception if it cannot be unmounted.
+ """
+
+ path = os.path.realpath(path)
+
+ if recursive:
+ paths = []
+ for m in mountinfo():
+ if m.path == path or m.path.startswith(path+"/"):
+ paths.append(m.path)
+ if not m.is_fuse:
+ raise Exception(
+ "cannot unmount {}: non-fuse mountpoint {}".format(
+ path, m))
+ for path in sorted(paths, key=len, reverse=True):
+ unmount(path, timeout=timeout, recursive=False)
+ return len(paths) > 0
+
+ was_mounted = False
+ attempted = False
+ if timeout is None:
+ deadline = None
+ else:
+ deadline = time.time() + timeout
+
+ while True:
+ mounted = False
+ for m in mountinfo():
+ if m.is_fuse:
+ try:
+ if os.path.realpath(m.path) == path:
+ was_mounted = True
+ mounted = True
+ break
+ except OSError:
+ continue
+ if not mounted:
+ return was_mounted
+
+ if attempted:
+ delay = 1
+ if deadline:
+ delay = min(delay, deadline - time.time())
+ if delay <= 0:
+ raise Exception("timed out")
+ time.sleep(delay)
+
+ try:
+ with open('/sys/fs/fuse/connections/{}/abort'.format(m.minor),
+ 'w') as f:
+ f.write("1")
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+
+ attempted = True
+ try:
+ subprocess.check_call(["fusermount", "-u", "-z", path])
+ except subprocess.CalledProcessError:
+ pass
--- /dev/null
+import subprocess
+
+from integration_test import IntegrationTest
+
+
+class CrunchstatTest(IntegrationTest):
+ def test_crunchstat(self):
+ output = subprocess.check_output(
+ ['./bin/arv-mount',
+ '--crunchstat-interval', '1',
+ self.mnt,
+ '--exec', 'echo', 'ok'])
+ self.assertEqual("ok\n", output)
-import arvados
-import arvados.safeapi
-import arvados_fuse as fuse
-import glob
import json
import llfuse
+import logging
+import mock
import os
-import shutil
import subprocess
-import sys
-import tempfile
-import threading
import time
import unittest
-import logging
-import multiprocessing
+
+import arvados
+import arvados_fuse as fuse
import run_test_server
-import mock
-import re
from mount_test_base import MountTestBase
--- /dev/null
+import os
+import subprocess
+import time
+
+from integration_test import IntegrationTest
+
+class UnmountTest(IntegrationTest):
+ def setUp(self):
+ super(UnmountTest, self).setUp()
+ self.tmp = self.mnt
+ self.to_delete = []
+
+ def tearDown(self):
+ for d in self.to_delete:
+ os.rmdir(d)
+ super(UnmountTest, self).tearDown()
+
+ def test_replace(self):
+ subprocess.check_call(
+ ['./bin/arv-mount', '--subtype', 'test', '--replace',
+ self.mnt])
+ subprocess.check_call(
+ ['./bin/arv-mount', '--subtype', 'test', '--replace',
+ '--unmount-timeout', '10',
+ self.mnt])
+ subprocess.check_call(
+ ['./bin/arv-mount', '--subtype', 'test', '--replace',
+ '--unmount-timeout', '10',
+ self.mnt,
+ '--exec', 'true'])
+ for m in subprocess.check_output(['mount']).splitlines():
+ self.assertNotIn(' '+self.mnt+' ', m)
+
+ def _mounted(self, mounts):
+ all_mounts = subprocess.check_output(['mount', '-t', 'fuse.test'])
+ return [m for m in mounts
+ if ' '+m+' ' in all_mounts]
+
+ def test_unmount_children(self):
+ for d in ['foo', 'foo/bar', 'bar']:
+ mnt = self.tmp+'/'+d
+ os.mkdir(mnt)
+ self.to_delete.insert(0, mnt)
+ mounts = []
+ for d in ['bar', 'foo/bar']:
+ mnt = self.tmp+'/'+d
+ mounts.append(mnt)
+ subprocess.check_call(
+ ['./bin/arv-mount', '--subtype', 'test', mnt])
+
+ # Wait for mounts to attach
+ deadline = time.time() + 10
+ while self._mounted(mounts) != mounts:
+ time.sleep(0.1)
+ self.assertLess(time.time(), deadline)
+
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.check_call(['./bin/arv-mount', '--unmount-all', self.tmp])
+ self.assertEqual([], self._mounted(mounts))
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/keep-balance/keep-balance.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=simple
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/keep-web/keep-web.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/keep-web
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/keepproxy/keepproxy.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/keepproxy
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/keepstore/keepstore.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/keepstore
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
- if not self.cloud_node.size:
- self.cloud_node.size = self.cloud_size
+
+ # The information included in the node size object we get from libcloud
+ # is inconsistent between cloud providers. Replace libcloud NodeSize
+ # object with compatible CloudSizeWrapper object which merges the size
+ # info reported from the cloud with size information from the
+ # configuration file.
+ self.cloud_node.size = self.cloud_size
+
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
if self.arvados_node is None:
return 'unpaired'
- # This node is indicated as non-functioning by the cloud
- if self._cloud.broken(self.cloud_node):
- return 'down'
-
state = self.arvados_node['crunch_worker_state']
# If state information is not available because it is missing or the
return (isinstance(exception, cls.CLOUD_ERRORS) or
type(exception) is Exception)
+ def destroy_node(self, cloud_node):
+ try:
+ return self.real.destroy_node(cloud_node)
+ except self.CLOUD_ERRORS as destroy_error:
+ # Sometimes the destroy node request succeeds but times out and
+ # raises an exception instead of returning success. If this
+ # happens, we get a noisy stack trace. Check if the node is still
+ # on the node list. If it is gone, we can declare victory.
+ try:
+ self.search_for_now(cloud_node.id, 'list_nodes')
+ except ValueError:
+ # If we catch ValueError, that means search_for_now didn't find
+ # it, which means destroy_node actually succeeded.
+ return True
+ # The node is still on the list. Re-raise.
+ raise
+
# Now that we've defined all our own methods, delegate generic, public
# attributes of libcloud drivers that we haven't defined ourselves.
def _delegate_to_real(attr_name):
raise
def sync_node(self, cloud_node, arvados_node):
+ # Update the cloud node record to ensure we have the correct metadata
+ # fingerprint.
+ cloud_node = self.real.ex_get_node(cloud_node.name, cloud_node.extra['zone'])
+
# We can't store the FQDN on the name attribute or anything like it,
# because (a) names are static throughout the node's life (so FQDN
# isn't available because we don't know it at node creation time) and
self._find_metadata(metadata_items, 'hostname')['value'] = hostname
except KeyError:
metadata_items.append({'key': 'hostname', 'value': hostname})
- response = self.real.connection.async_request(
- '/zones/{}/instances/{}/setMetadata'.format(
- cloud_node.extra['zone'].name, cloud_node.name),
- method='POST', data=metadata_req)
- if not response.success():
- raise Exception("setMetadata error: {}".format(response.error))
+
+ self.real.ex_set_node_metadata(cloud_node, metadata_items)
@classmethod
def node_fqdn(cls, node):
def try_pairing(self):
for record in self.cloud_nodes.unpaired():
for arv_rec in self.arvados_nodes.unpaired():
- if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+ if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
self._pair_nodes(record, arv_rec.arvados_node)
break
elif (nodes_wanted < 0) and self.booting:
self._later.stop_booting_node(size)
except Exception as e:
- self._logger.exception("while calculating nodes wanted for size %s", size)
+ self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
- if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
- self._begin_node_shutdown(node_actor, cancellable=True)
- elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
- # Node is unpaired, which means it probably exceeded its booting
- # grace period without a ping, so shut it down so we can boot a new
- # node in its place.
- self._begin_node_shutdown(node_actor, cancellable=False)
- elif node_actor.in_state('down').get():
- # Node is down and unlikely to come back.
- self._begin_node_shutdown(node_actor, cancellable=False)
+ try:
+ if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
+ self._begin_node_shutdown(node_actor, cancellable=True)
+ elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
+ # Node is unpaired, which means it probably exceeded its booting
+ # grace period without a ping, so shut it down so we can boot a new
+ # node in its place.
+ self._begin_node_shutdown(node_actor, cancellable=False)
+ elif node_actor.in_state('down').get():
+ # Node is down and unlikely to come back.
+ self._begin_node_shutdown(node_actor, cancellable=False)
+ except pykka.ActorDeadError as e:
+ # The monitor actor sends shutdown suggestions every time the
+ # node's state is updated, and these go into the daemon actor's
+ # message queue. It's possible that the node has already been shut
+ # down (which shuts down the node monitor actor). In that case,
+ # this message is stale and we'll get ActorDeadError when we try to
+ # access node_actor. Log the error.
+ self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
def node_finished_shutdown(self, shutdown_actor):
try:
cloud_node = testutil.cloud_node_mock(
2, metadata=start_metadata.copy(),
zone=testutil.cloud_object_mock('testzone'))
+ self.driver_mock().ex_get_node.return_value = cloud_node
driver = self.new_driver()
driver.sync_node(cloud_node, arv_node)
- args, kwargs = self.driver_mock().connection.async_request.call_args
- self.assertEqual('/zones/testzone/instances/2/setMetadata', args[0])
- for key in ['kind', 'fingerprint']:
- self.assertEqual(start_metadata[key], kwargs['data'][key])
+ args, kwargs = self.driver_mock().ex_set_node_metadata.call_args
+ self.assertEqual(cloud_node, args[0])
plain_metadata['hostname'] = 'compute1.zzzzz.arvadosapi.com'
self.assertEqual(
plain_metadata,
- {item['key']: item['value'] for item in kwargs['data']['items']})
+ {item['key']: item['value'] for item in args[1]})
def test_sync_node_updates_hostname_tag(self):
self.check_sync_node_updates_hostname_tag(
arv_node = testutil.arvados_node_mock(8)
cloud_node = testutil.cloud_node_mock(
9, metadata={}, zone=testutil.cloud_object_mock('failzone'))
- mock_response = self.driver_mock().connection.async_request()
- mock_response.success.return_value = False
- mock_response.error = 'sync error test'
+ mock_response = self.driver_mock().ex_set_node_metadata.side_effect = (Exception('sync error test'),)
driver = self.new_driver()
with self.assertRaises(Exception) as err_check:
driver.sync_node(cloud_node, arv_node)
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
self.assertEqual(0, self.alive_monitor_count())
- def test_broken_node_not_counted(self):
- size = testutil.MockSize(8)
- cloud_node = testutil.cloud_node_mock(8, size=size)
- wishlist = [size]
- self.make_daemon([cloud_node], [testutil.arvados_node_mock(8)],
- wishlist, avail_sizes=[(size, {"cores":1})])
- self.assertEqual(1, self.alive_monitor_count())
- self.assertFalse(self.node_setup.start.called)
- monitor = self.monitor_list()[0].proxy()
- shutdown_proxy = self.node_shutdown.start().proxy
- shutdown_proxy().cloud_node.get.return_value = cloud_node
- shutdown_proxy().success.get.return_value = False
- self.cloud_factory().broken.return_value = True
- self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
- self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
- self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
- self.daemon.update_server_wishlist(wishlist).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
-
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
cloud_node = testutil.cloud_node_mock(6, size=size)
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/ws/ws.yml
+# systemd<230
+StartLimitInterval=0
+# systemd>=230
+StartLimitIntervalSec=0
[Service]
Type=notify
ExecStart=/usr/bin/arvados-ws
Restart=always
+RestartSec=1
[Install]
WantedBy=multi-user.target