pipelines = PipelineInstance.limit(lim).order(["created_at desc"])
crs = ContainerRequest.limit(lim).order(["created_at desc"]).filter([["requesting_container_uuid", "=", nil]])
- cr_uuids = crs.results.collect { |c| c.container_uuid }
- containers = Container.order(["created_at desc"]).results if cr_uuids.any?
-
procs = {}
pipelines.results.each { |pi| procs[pi] = pi.created_at }
- containers.each { |c| procs[c] = c.created_at } if !containers.nil?
+ crs.results.each { |c| procs[c] = c.created_at }
Hash[procs.sort_by {|key, value| value}].keys.reverse.first(lim)
end
class ContainerRequestsController < ApplicationController
+ def show_pane_list
+ %w(Status Log Advanced)
+ end
+
+ def cancel
+ @object.update_attributes! priority: 0
+ if params[:return_to]
+ redirect_to params[:return_to]
+ else
+ redirect_to @object
+ end
+ end
end
class ContainersController < ApplicationController
+ def show_pane_list
+ %w(Status Log Advanced)
+ end
end
class Container < ArvadosBase
+ def self.creatable?
+ false
+ end
+
def work_unit(label=nil)
ContainerWorkUnit.new(self, label)
end
class ContainerRequest < ArvadosBase
+ def self.creatable?
+ false
+ end
+ def textile_attributes
+ [ 'description' ]
+ end
+
+ def work_unit(label=nil)
+ ContainerWorkUnit.new(self, label)
+ end
end
class ContainerWorkUnit < ProxyWorkUnit
+ attr_accessor :container
+
+ def initialize proxied, label
+ super
+ if @proxied.is_a?(ContainerRequest)
+ container_uuid = get(:container_uuid)
+ if container_uuid
+ @container = Container.where(uuid: container_uuid).first
+ end
+ end
+ end
+
def children
return self.my_children if self.my_children
+ container_uuid = nil
+ container_uuid = if @proxied.is_a?(Container) then uuid else get(:container_uuid) end
+
items = []
+ if container_uuid
+ reqs = ContainerRequest.where(requesting_container_uuid: container_uuid).results
+ reqs.each do |cr|
+ items << cr.work_unit(cr.name || 'this container')
+ end
+ end
- crs = {}
- reqs = ContainerRequest.where(requesting_container_uuid: uuid).results
- reqs.each { |cr| crs[cr.container_uuid] = cr.name }
+ self.my_children = items
+ end
- containers = Container.where(uuid: crs.keys).results
- containers.each do |c|
- items << c.work_unit(crs[c.uuid])
+ def title
+ "container"
+ end
+
+ def uri
+ uuid = get(:uuid)
+
+ return nil unless uuid
+
+ if @proxied.class.respond_to? :table_name
+ "/#{@proxied.class.table_name}/#{uuid}"
+ else
+ resource_class = ArvadosBase.resource_class_for_uuid(uuid)
+ "#{resource_class.table_name}/#{uuid}" if resource_class
end
+ end
- self.my_children = items
+ def can_cancel?
+ @proxied.is_a?(ContainerRequest) && state_label.in?(["Queued", "Locked", "Running"]) && priority > 0
+ end
+
+ def container_uuid
+ get(:container_uuid)
+ end
+
+ # For the following properties, use value from the @container if exists
+ # This applies to a ContainerRequest with container_uuid
+
+ def started_at
+ t = get_combined(:started_at)
+ t = Time.parse(t) if (t.is_a? String)
+ t
+ end
+
+ def modified_at
+ t = get_combined(:modified_at)
+ t = Time.parse(t) if (t.is_a? String)
+ t
+ end
+
+ def finished_at
+ t = get_combined(:finished_at)
+ t = Time.parse(t) if (t.is_a? String)
+ t
+ end
+
+ def state_label
+ get_combined(:state)
end
def docker_image
- get(:container_image)
+ get_combined(:container_image)
end
def runtime_constraints
- get(:runtime_constraints)
+ get_combined(:runtime_constraints)
end
def priority
- get(:priority)
+ get_combined(:priority)
end
def log_collection
- get(:log)
+ get_combined(:log)
end
def outputs
items = []
- items << get(:output) if get(:output)
+ items << get_combined(:output) if get_combined(:output)
items
end
- def uri
- uuid = get(:uuid)
- "/containers/#{uuid}"
+ def command
+ get_combined(:command)
end
- def title
- "container"
+ def cwd
+ get_combined(:cwd)
end
- def can_cancel?
- true
+ def environment
+ env = get_combined(:environment)
+ env = nil if env.andand.empty?
+ env
+ end
+
+ def mounts
+ mnt = get_combined(:mounts)
+ mnt = nil if mnt.andand.empty?
+ mnt
+ end
+
+ def output_path
+ get_combined(:output_path)
+ end
+
+ # End combined propeties
+
+ protected
+ def get_combined key
+ get(key, @container) || get(key, @proxied)
end
end
end
def state_bootstrap_class
- state = get(:state)
+ state = state_label
case state
when 'Complete'
'success'
end
def success?
- state = get(:state)
+ state = state_label
if state == 'Complete'
true
elsif state == 'Failed' or state == 'Cancelled'
end
def progress
- state = get(:state)
+ state = state_label
if state == 'Complete'
return 1.0
elsif state == 'Failed' or state == 'Cancelled'
protected
- def get key
- if @proxied.respond_to? key
- @proxied.send(key)
- elsif @proxied.is_a?(Hash)
- @proxied[key]
+ def get key, obj=@proxied
+ if obj.respond_to? key
+ obj.send(key)
+ elsif obj.is_a?(Hash)
+ obj[key]
end
end
end
def is_failed?
# is this work unit in failed state?
end
+
+ def command
+ # command to execute
+ end
+
+ def cwd
+ # initial workind directory
+ end
+
+ def environment
+ # environment variables
+ end
+
+ def mounts
+ # mounts
+ end
+
+ def output_path
+ # path to a directory or file to save output
+ end
+
+ def container_uuid
+ # container_uuid of a container_request
+ end
end
--- /dev/null
+<%= render(partial: 'work_unit/show_status', locals: {current_obj: @object, name: @object[:name] || 'this container'}) %>
--- /dev/null
+<%= render(partial: 'work_unit/show_status', locals: {current_obj: @object, name: @object[:name] || 'this container'}) %>
+
+<div class="panel panel-default">
+ <div class="panel-heading">
+ <span class="panel-title">Container requests</span>
+ </div>
+ <div class="panel-body">
+ <% crs = ContainerRequest.order("created_at desc").filter([["container_uuid", "=", @object.uuid]]) %>
+ <% crs.each do |cr| %>
+ <div>
+ <%= link_to_if_arvados_object cr, friendly_name: true %>
+ created at
+ <%= render_localized_date(cr.created_at) %>.
+ </div>
+ <% end %>
+ </div>
+</div>
-<div class="arv-log-refresh-control"
- data-load-throttle="15000"
- ></div>
-<%=
- pj = {}
- pj[:job] = @object
- pj[:name] = @object[:name] || "this job"
- pj[:progress_bar] = render(partial: "job_progress",
- locals: {:j => @object })
- tasks = JobTask.filter([['job_uuid', '=', @object.uuid]]).results
- render(partial: 'work_unit/show_component', locals: {wu: @object.work_unit(@object[:name] || "this job")})
-%>
+<%= render(partial: 'work_unit/show_status', locals: {current_obj: @object, name: @object[:name] || 'this job'}) %>
<div class="panel panel-default">
<div class="panel-heading">
<div class="row">
<div class="col-md-6">
<div class="panel panel-default" style="min-height: 10.5em">
- <div class="panel-heading"><span class="panel-title">Recent pipelines and processes</span>
+ <div class="panel-heading">
+ <span class="panel-title">Recent pipelines and processes</span>
<% if current_user.andand.is_active %>
- <span class="pull-right">
+ <span class="pull-right recent-processes-actions">
<span>
<%= link_to(
choose_pipeline_templates_path(
<% _recent_processes.each do |p| %>
<% wu = p.work_unit %>
<% if wu.is_finished? %>
- <div class="dashboard-panel-info-row">
+ <div class="dashboard-panel-info-row row-<%=wu.uuid%>">
<div class="row">
<div class="col-md-6 text-overflow-ellipsis">
<%= link_to_if_arvados_object p, {friendly_name: true} %>
</div>
<% else %>
- <div class="dashboard-panel-info-row">
+ <div class="dashboard-panel-info-row row-<%=wu.uuid%>">
<div class="clearfix">
<%= link_to_if_arvados_object p, {friendly_name: true} %>
<div class="pull-right" style="width: 40%">
<% nodes = Node.all %>
<div class="panel panel-default" style="min-height: 10.5em">
<div class="panel-heading"><span class="panel-title">Compute node status</span>
- <span class="pull-right">
+ <span class="pull-right compute-node-actions">
<% if current_user.andand.is_admin %>
<span>
<%= link_to nodes_path, class: 'btn btn-default btn-xs' do %>
No <%= current_obj.title %> has been submitted yet.
<% else %>
<table>
- <% keys = [:uuid, :modified_by_user_uuid, :created_at, :started_at, :finished_at, :priority] %>
+ <% keys = [:uuid, :modified_by_user_uuid, :created_at, :started_at, :finished_at, :container_uuid, :priority] %>
<% keys << :outputs if @object.uuid == current_obj.uuid %>
<% keys.each do |k| %>
<% val = current_obj.send(k) if current_obj.respond_to?(k) %>
# ...and the api server provides an http:// or https:// url
repo = nil unless repo.andand.http_fetch_url
%>
- <% [:script, :repository, :script_version, :supplied_script_version, :nondeterministic].each do |k| %>
+ <% [:script, :repository, :script_version, :supplied_script_version, :nondeterministic,
+ :command, :cwd, :environment, :mounts, :output_path].each do |k| %>
<% val = current_obj.send(k) if current_obj.respond_to?(k) %>
<% if val %>
<tr>
current_obj.docker_image, friendly_name: true) %>
</td>
</tr>
+ <% elsif current_obj.docker_image %>
+ <tr>
+ <td style="padding-right: 1em">
+ docker_image_locator:
+ </td>
+ <td>
+ <%= link_to_arvados_object_if_readable(current_obj.docker_image,
+ current_obj.docker_image, friendly_name: true) %>
+ </td>
+ </tr>
<% end %>
</table>
</div>
--- /dev/null
+<div class="arv-log-refresh-control"
+ data-load-throttle="15000"
+ ></div>
+<%=
+ render(partial: 'work_unit/show_component', locals: {wu: current_obj.work_unit(name)})
+%>
resources :api_client_authorizations
resources :virtual_machines
resources :containers
- resources :container_requests
+ resources :container_requests do
+ post 'cancel', :on => :member
+ end
get '/virtual_machines/:id/webshell/:login' => 'virtual_machines#webshell', :as => :webshell_virtual_machine
resources :authorized_keys
resources :job_tasks
visit page_with_token(token)
assert_text 'Recent pipelines and processes' # seeing dashboard now
+ within('.recent-processes-actions') do
+ assert page.has_link?('Run a pipeline')
+ assert page.has_link?('All pipelines')
+ end
+
within('.recent-processes') do
- page.has_button? 'Run a pipeline'
- page.has_link? 'All pipelines'
- assert_text 'zzzzz-d1hrv-partdonepipelin'
+ assert_text 'running_with_job'
+ within('.row-zzzzz-d1hrv-runningpipeline') do
+ assert_text 'foo'
+ end
+
assert_text 'zzzzz-d1hrv-twodonepipeline'
- assert_text 'zzzzz-dz642-runningcontainr'
- assert_text 'zzzzz-dz642-runningcontain2'
+ within('.row-zzzzz-d1hrv-twodonepipeline')do
+ assert_text 'No output'
+ end
+
+ assert_text 'completed container request'
+ within('.row-zzzzz-xvhdp-cr4completedctr')do
+ assert page.has_link? 'foo_file'
+ end
+ end
+
+ within('.compute-node-actions') do
+ if is_admin
+ assert page.has_link?('All nodes')
+ else
+ assert page.has_no_link?('All nodes')
+ end
+ assert page.has_link? 'All jobs'
end
within('.compute-node-summary-pane') do
- page.has_link?('All nodes') if is_admin
- page.has_link? 'All jobs'
click_link 'Details'
assert_text 'compute0'
end
end
end
+
+ [
+ ['jobs', 'running_job_with_components', true],
+ ['pipeline_instances', 'components_is_jobspec', false],
+ ['containers', 'running', false],
+ ['container_requests', 'running', true],
+ ].each do |type, fixture, cancelable|
+ test "cancel button for #{type}/#{fixture}" do
+ if cancelable
+ need_selenium 'to cancel'
+ end
+
+ obj = api_fixture(type)[fixture]
+ visit page_with_token "active", "/#{type}/#{obj['uuid']}"
+
+ assert_text 'created_at'
+ if cancelable
+ assert page.has_button?('Cancel'), 'No Cancel button'
+ click_button 'Cancel'
+ wait_for_ajax
+ assert page.has_no_button?('Cancel'), 'Cancel button not expected after clicking'
+ else
+ assert page.has_no_button?('Cancel'), 'Cancel button not expected'
+ end
+ end
+ end
+
+ [
+ ['jobs', 'running_job_with_components'],
+ ['pipeline_instances', 'has_component_with_completed_jobs'],
+ ['container_requests', 'running'],
+ ['container_requests', 'completed'],
+ ].each do |type, fixture|
+ test "edit description for #{type}/#{fixture}" do
+ obj = api_fixture(type)[fixture]
+ visit page_with_token "active", "/#{type}/#{obj['uuid']}"
+
+ within('.arv-description-as-subtitle') do
+ find('.fa-pencil').click
+ find('.editable-input textarea').set('*Textile description for object*')
+ find('.editable-submit').click
+ end
+ wait_for_ajax
+
+ # verify description
+ assert page.has_no_text? '*Textile description for object*'
+ assert page.has_text? 'Textile description for object'
+ end
+ end
end
[PipelineInstance, 'has_component_with_completed_jobs', nil, 3, "Complete", true, 1.0],
[PipelineInstance, 'pipeline_with_tagged_collection_input', "pwu", 1, "Ready", nil, 0.0],
[Container, 'requester', 'cwu', 1, "Complete", true, 1.0],
+ [ContainerRequest, 'cr_for_requester', 'cwu', 1, "Complete", true, 1.0],
].each do |type, fixture, label, num_children, state, success, progress|
test "children of #{fixture}" do
use_token 'active'
--- /dev/null
+#!/bin/sh
+
+set -e
+
+if [ 0 = "$#" ]; then
+ PACKAGE_NAME="$(basename "$0" | grep -Eo '\barvados.*$')"
+ PACKAGE_NAME=${PACKAGE_NAME%.sh}
+else
+ PACKAGE_NAME=$1; shift
+fi
+
+cd "/var/www/${PACKAGE_NAME%-server}/current"
+
+case "$TARGET" in
+ debian*|ubuntu*)
+ apt-get install -y nginx
+ dpkg-reconfigure "$PACKAGE_NAME"
+ ;;
+ centos*)
+ yum install --assumeyes httpd
+ yum reinstall --assumeyes "$PACKAGE_NAME"
+ ;;
+ *)
+ echo -e "$0: Unknown target '$TARGET'.\n" >&2
+ exit 1
+ ;;
+esac
+
+/usr/local/rvm/bin/rvm-exec default bundle list >"$ARV_PACKAGES_DIR/$PACKAGE_NAME.gems"
+++ /dev/null
-#!/bin/sh
-set -e
-cd /var/www/arvados-api/current/
-
-case "$TARGET" in
- debian*|ubuntu*)
- apt-get install -y nginx
- dpkg-reconfigure arvados-api-server
- ;;
- centos*)
- yum install --assumeyes httpd
- yum reinstall --assumeyes arvados-api-server
- ;;
- *)
- echo -e "$0: Unknown target '$TARGET'.\n" >&2
- exit 1
- ;;
-esac
-
-/usr/local/rvm/bin/rvm-exec default bundle list >"$ARV_PACKAGES_DIR/arvados-api-server.gems"
--- /dev/null
+common-test-rails-server-package.sh
\ No newline at end of file
+++ /dev/null
-#!/bin/bash
-
-set -e
-
-EXITCODE=0
-DEBUG=${ARVADOS_DEBUG:-0}
-
-STDOUT_IF_DEBUG=/dev/null
-STDERR_IF_DEBUG=/dev/null
-DASHQ_UNLESS_DEBUG=-q
-if [[ "$DEBUG" != 0 ]]; then
- STDOUT_IF_DEBUG=/dev/stdout
- STDERR_IF_DEBUG=/dev/stderr
- DASHQ_UNLESS_DEBUG=
-fi
-
-case "$TARGET" in
- debian*|ubuntu*)
- FORMAT=deb
- ;;
- centos*)
- FORMAT=rpm
- ;;
- *)
- echo -e "$0: Unknown target '$TARGET'.\n" >&2
- exit 1
- ;;
-esac
-
-if ! [[ -n "$WORKSPACE" ]]; then
- echo >&2 "$helpmessage"
- echo >&2
- echo >&2 "Error: WORKSPACE environment variable not set"
- echo >&2
- exit 1
-fi
-
-if ! [[ -d "$WORKSPACE" ]]; then
- echo >&2 "$helpmessage"
- echo >&2
- echo >&2 "Error: $WORKSPACE is not a directory"
- echo >&2
- exit 1
-fi
-
-title () {
- txt="********** $1 **********"
- printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
-}
-
-checkexit() {
- if [[ "$1" != "0" ]]; then
- title "!!!!!! $2 FAILED !!!!!!"
- fi
-}
-
-
-# Find the SSO server package
-
-cd "$WORKSPACE"
-
-if [[ ! -d "/var/www/arvados-sso" ]]; then
- echo "/var/www/arvados-sso should exist"
- exit 1
-fi
-
-if [[ ! -e "/etc/arvados/sso/application.yml" ]]; then
- mkdir -p /etc/arvados/sso/
- RANDOM_PASSWORD=`date | md5sum |cut -f1 -d' '`
- cp config/application.yml.example /etc/arvados/sso/application.yml
- sed -i -e 's/uuid_prefix: ~/uuid_prefix: zzzzz/' /etc/arvados/sso/application.yml
- sed -i -e "s/secret_token: ~/secret_token: $RANDOM_PASSWORD/" /etc/arvados/sso/application.yml
-fi
-
-if [[ ! -e "/etc/arvados/sso/database.yml" ]]; then
- # We need to set up our database configuration now.
- if [[ "$FORMAT" == "rpm" ]]; then
- service postgresql initdb
- sed -i -e "s/127.0.0.1\/32 ident/127.0.0.1\/32 md5/" /var/lib/pgsql/data/pg_hba.conf
- sed -i -e "s/::1\/128 ident/::1\/128 md5/" /var/lib/pgsql/data/pg_hba.conf
- fi
- service postgresql start
-
- RANDOM_PASSWORD=`date | md5sum |cut -f1 -d' '`
- cat >/etc/arvados/sso/database.yml <<EOF
-production:
- adapter: postgresql
- encoding: utf8
- database: sso_provider_production
- username: sso_provider_user
- password: $RANDOM_PASSWORD
- host: localhost
-EOF
-
- su postgres -c "psql -c \"CREATE USER sso_provider_user WITH PASSWORD '$RANDOM_PASSWORD'\""
- su postgres -c "createdb sso_provider_production -O sso_provider_user"
-fi
-
-if [[ "$FORMAT" == "deb" ]]; then
- # Test 2: the package should reconfigure cleanly
- dpkg-reconfigure arvados-sso-server || EXITCODE=3
-
- cd /var/www/arvados-sso/current/
- /usr/local/rvm/bin/rvm-exec default bundle list >"$ARV_PACKAGES_DIR/arvados-sso-server.gems"
-
- # Test 3: the package should remove cleanly
- apt-get remove arvados-sso-server --yes || EXITCODE=3
-
- checkexit $EXITCODE "apt-get remove arvados-sso-server --yes"
-
- # Test 4: the package configuration should remove cleanly
- dpkg --purge arvados-sso-server || EXITCODE=4
-
- checkexit $EXITCODE "dpkg --purge arvados-sso-server"
-
- if [[ -e "/var/www/arvados-sso" ]]; then
- EXITCODE=4
- fi
-
- checkexit $EXITCODE "leftover items under /var/www/arvados-sso"
-
- # Test 5: the package should remove cleanly with --purge
-
- apt-get remove arvados-sso-server --purge --yes || EXITCODE=5
-
- checkexit $EXITCODE "apt-get remove arvados-sso-server --purge --yes"
-
- if [[ -e "/var/www/arvados-sso" ]]; then
- EXITCODE=5
- fi
-
- checkexit $EXITCODE "leftover items under /var/www/arvados-sso"
-
-elif [[ "$FORMAT" == "rpm" ]]; then
-
- # Set up Nginx first
- # (courtesy of https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/el6/install_passenger.html)
- yum install -q -y epel-release pygpgme curl
- curl --fail -sSLo /etc/yum.repos.d/passenger.repo https://oss-binaries.phusionpassenger.com/yum/definitions/el-passenger.repo
- yum install -q -y nginx passenger
- sed -i -e 's/^# passenger/passenger/' /etc/nginx/conf.d/passenger.conf
- # Done setting up Nginx
-
- # Test 2: the package should reinstall cleanly
- yum --assumeyes reinstall arvados-sso-server || EXITCODE=3
-
- cd /var/www/arvados-sso/current/
- /usr/local/rvm/bin/rvm-exec default bundle list >$ARV_PACKAGES_DIR/arvados-sso-server.gems
-
- # Test 3: the package should remove cleanly
- yum -q -y remove arvados-sso-server || EXITCODE=3
-
- checkexit $EXITCODE "yum -q -y remove arvados-sso-server"
-
- if [[ -e "/var/www/arvados-sso" ]]; then
- EXITCODE=3
- fi
-
- checkexit $EXITCODE "leftover items under /var/www/arvados-sso"
-
-fi
-
-if [[ "$EXITCODE" == "0" ]]; then
- echo "Testing complete, no errors!"
-else
- echo "Errors while testing!"
-fi
-
-exit $EXITCODE
--- /dev/null
+common-test-rails-server-package.sh
\ No newline at end of file
+++ /dev/null
-#!/bin/sh
-set -e
-cd /var/www/arvados-workbench/current/
-
-case "$TARGET" in
- debian*|ubuntu*)
- apt-get install -y nginx
- dpkg-reconfigure arvados-workbench
- ;;
- centos*)
- yum install --assumeyes httpd
- yum reinstall --assumeyes arvados-workbench
- ;;
- *)
- echo -e "$0: Unknown target '$TARGET'.\n" >&2
- exit 1
- ;;
-esac
-
-/usr/local/rvm/bin/rvm-exec default bundle list >"$ARV_PACKAGES_DIR/arvados-workbench.gems"
--- /dev/null
+common-test-rails-server-package.sh
\ No newline at end of file
}
configure_version() {
- WEB_SERVICE=${WEB_SERVICE:-$(service --status-all 2>/dev/null \
- | grep -Eo '\bnginx|httpd[^[:space:]]*' || true)}
+ if [ -n "$WEB_SERVICE" ]; then
+ SERVICE_MANAGER=$(guess_service_manager)
+ elif WEB_SERVICE=$(list_services_systemd | grep -E '^(nginx|httpd)'); then
+ SERVICE_MANAGER=systemd
+ elif WEB_SERVICE=$(list_services_service \
+ | grep -Eo '\b(nginx|httpd)[^[:space:]]*'); then
+ SERVICE_MANAGER=service
+ fi
+
if [ -z "$WEB_SERVICE" ]; then
report_web_service_warning "Web service (Nginx or Apache) not found"
elif [ "$WEB_SERVICE" != "$(echo "$WEB_SERVICE" | head -n 1)" ]; then
setup_before_nginx_restart
- if [ ! -z "$WEB_SERVICE" ]; then
- service "$WEB_SERVICE" restart
+ if [ -n "$SERVICE_MANAGER" ]; then
+ service_command "$SERVICE_MANAGER" restart "$WEB_SERVICE"
fi
}
if ! type setup_before_nginx_restart >/dev/null 2>&1; then
setup_before_nginx_restart() { return; }
fi
+
+if [ -e /run/systemd/system ]; then
+ USING_SYSTEMD=1
+else
+ USING_SYSTEMD=0
+fi
+
+if which service >/dev/null 2>&1; then
+ USING_SERVICE=1
+else
+ USING_SERVICE=0
+fi
+
+guess_service_manager() {
+ if [ 1 = "$USING_SYSTEMD" ]; then
+ echo systemd
+ elif [ 1 = "$USING_SERVICE" ]; then
+ echo service
+ else
+ return 1
+ fi
+}
+
+list_services_systemd() {
+ test 1 = "$USING_SYSTEMD" || return
+ # Print only service names, without the `.service` suffix.
+ systemctl list-unit-files '*.service' \
+ | awk '($1 ~ /\.service/){print substr($1, 1, length($1) - 8)}'
+}
+
+list_services_service() {
+ test 1 = "$USING_SERVICE" || return
+ # Output is completely different across Debian and Red Hat.
+ # We can't really parse it.
+ service --status-all 2>/dev/null
+}
+
+service_command() {
+ local service_manager="$1"; shift
+ local command="$1"; shift
+ local service="$1"; shift
+ case "$service_manager" in
+ systemd) systemctl "$command" "$service" ;;
+ service) service "$service" "$command" ;;
+ esac
+}
+
+if ! guess_service_manager >/dev/null; then
+ echo "WARNING: Unsupported init system. Can't manage web service." >&2
+fi
if [[ ! -d "$WORKSPACE/packages/$TARGET" ]]; then
mkdir -p $WORKSPACE/packages/$TARGET
+ chown --reference="$WORKSPACE" "$WORKSPACE/packages/$TARGET"
fi
# Perl packages
local -a pos_args=("$srcdir/=$railsdir" "$pkgname" "Curoverse, Inc." dir
"$(cat "$version_file")")
local license_arg="$license_path=$railsdir/$(basename "$license_path")"
- # --iteration=5 accommodates the package script bugfixes #8371 and #8413.
- local -a switches=(--iteration=5
+ local -a switches=(--iteration=6
--after-install "$scripts_dir/postinst"
--before-remove "$scripts_dir/prerm"
--after-remove "$scripts_dir/postrm")
libssl-dev libxslt1.1 zlib1g-dev
</span></code></pre></notextile>
-Install prerequisites for CentOS 6:
+Install prerequisites for CentOS 6 or 7:
<notextile>
<pre><code><span class="userinput">sudo yum install \
{% include 'notebox_begin' %}
-On older Red Hat-based systems, these packages require the python27 Software Collection. The Software Collection will be installed automatically as long as Software Collections are enabled on your system.
+{% if rh_version %} On CentOS {{rh_version}} and RHEL {{rh_version}},
+{% else %} On CentOS and RHEL,
+{% endif %} these packages require a more recent version of Python from Software Collections. The Software Collection will be installed automatically as long as Software Collections are enabled on your system.
To "enable Software Collections on CentOS":https://wiki.centos.org/AdditionalResources/Repositories/SCL, run:
First, "add the appropriate package repository for your distribution":{{ site.baseurl }}/install/install-manual-prerequisites.html#repos.
-{% include 'note_python27_sc' %}
+{% include 'note_python_sc' %}
-On Debian-based systems:
+On CentOS 6 and RHEL 6:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install perl python-virtualenv fuse python-arvados-python-client python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
+<pre><code>~$ <span class="userinput">sudo yum install perl python27-python-virtualenv fuse python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
</code></pre>
</notextile>
-On Red Hat-based systems:
+On other Red Hat-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install perl python27-python-virtualenv fuse python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
+<pre><code>~$ <span class="userinput">echo 'exclude=python2-llfuse' | sudo tee -a /etc/yum.conf</span>
+~$ <span class="userinput">sudo yum install perl python-virtualenv fuse python-arvados-python-client python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
+</code></pre>
+</notextile>
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install perl python-virtualenv fuse python-arvados-python-client python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
</code></pre>
</notextile>
EOF</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo sh -c 'cat >run' <<'EOF'
#!/bin/sh
+if [ -d /opt/rh/python33 ]; then
+ source scl_source enable python33
+fi
exec python3 -m arvados_docker.cleaner --quota <b>50G</b>
EOF</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo chmod +x run log/run</span>
h3. CentOS
-Packages are available for CentOS 6. First, register the Curoverse signing key in RPM's database:
+Packages are available for CentOS 6 and 7. First, register the Curoverse signing key in RPM's database:
{% include 'install_redhat_key' %}
<notextile><pre>~$ <span class="userinput">sudo service rh-postgresql94-postgresql start</span></pre></notextile>
# "Set up Arvados credentials and databases":#rails_setup for the services that will use this PostgreSQL install.
+h2(#centos7). Install PostgreSQL on CentOS 7
+
+# Install PostgreSQL:
+ <notextile><pre>~$ <span class="userinput">sudo yum install postgresql-server</span></pre></notextile>
+# Initialize the database:
+ <notextile><pre>~$ <span class="userinput">sudo postgresql-setup initdb</span></pre></notextile>
+# Configure the database to accept password connections:
+ <notextile><pre><code>~$ <span class="userinput">sudo sed -ri -e 's/^(host +all +all +(127\.0\.0\.1\/32|::1\/128) +)ident$/\1md5/' /var/lib/pgsql/data/pg_hba.conf</span></code></pre></notextile>
+# Configure the database to launch at boot:
+ <notextile><pre>~$ <span class="userinput">sudo systemctl enable postgresql</span></pre></notextile>
+# Start the database:
+ <notextile><pre>~$ <span class="userinput">sudo systemctl start postgresql</span></pre></notextile>
+# "Set up Arvados credentials and databases":#rails_setup for the services that will use this PostgreSQL install.
+
h2(#debian). Install PostgreSQL on Debian or Ubuntu
# Install PostgreSQL:
h2. Install the Python SDK and utilities
-{% include 'note_python27_sc' %}
+{% assign rh_version = "6" %}
+{% include 'note_python_sc' %}
-On Debian-based systems:
+On CentOS 6 and RHEL 6:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client python-arvados-fuse crunchrunner</span>
+<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner</span>
</code></pre>
</notextile>
-On Red Hat-based systems:
+On other Red Hat-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner</span>
+<pre><code>~$ <span class="userinput">echo 'exclude=python2-llfuse' | sudo tee -a /etc/yum.conf</span>
+~$ <span class="userinput">sudo yum install python-arvados-python-client python-arvados-fuse crunchrunner</span>
+</code></pre>
+</notextile>
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client python-arvados-fuse crunchrunner</span>
</code></pre>
</notextile>
Install cron.
-On Debian-based systems:
+On CentOS 6 and RHEL 6:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install cron</span>
+<pre><code>~$ <span class="userinput">sudo yum install cronie</span>
+~$ <span class="userinput">sudo chkconfig crond on</span>
+~$ <span class="userinput">sudo service crond start</span>
</code></pre>
</notextile>
-On Red Hat-based systems:
+On other Red Hat-based distributions:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install cron</span>
+<pre><code>~$ <span class="userinput">sudo yum install cronie</span>
+~$ <span class="userinput">sudo systemctl enable crond</span>
+~$ <span class="userinput">sudo systemctl start crond</span>
+</code></pre>
+</notextile>
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install cron</span>
</code></pre>
</notextile>
Workbench doesn't need its own database, so it does not need to have PostgreSQL installed.
-{% include 'note_python27_sc' %}
+{% assign rh_version = "6" %}
+{% include 'note_python_sc' %}
On a Debian-based system, install the following packages:
First, "add the appropriate package repository for your distribution":{{ site.baseurl }}/install/install-manual-prerequisites.html#repos.
-{% include 'note_python27_sc' %}
+{% assign rh_version = "6" %}
+{% include 'note_python_sc' %}
-On Debian-based systems:
+On CentOS 6 and RHEL 6:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client</code>
+<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client</code>
</code></pre>
</notextile>
-On Red Hat-based systems:
+On other Red Hat-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client</code>
+<pre><code>~$ <span class="userinput">sudo yum install python-arvados-python-client</code>
+</code></pre>
+</notextile>
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client</code>
</code></pre>
</notextile>
--- /dev/null
+package arvados
+
+// APIClientAuthorization is an arvados#apiClientAuthorization resource.
+type APIClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+// APIClientAuthorizationList is an arvados#apiClientAuthorizationList resource.
+type APIClientAuthorizationList struct {
+ Items []APIClientAuthorization `json:"items"`
+}
"fmt"
"io"
"io/ioutil"
+ "math"
"net/http"
"net/url"
"os"
return err
}
if resp.StatusCode != 200 {
- return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ return newTransactionError(req, resp, buf)
}
if dst == nil {
return nil
return json.Unmarshal(buf, dst)
}
+// Convert an arbitrary struct to url.Values. For example,
+//
+// Foo{Bar: []int{1,2,3}, Baz: "waz"}
+//
+// becomes
+//
+// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+//
+// params itself is returned if it is already an url.Values.
+func anythingToValues(params interface{}) (url.Values, error) {
+ if v, ok := params.(url.Values); ok {
+ return v, nil
+ }
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have to get
+ // encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return nil, err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return nil, err
+ }
+ urlValues := url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ if v, ok := v.(float64); ok {
+ // Unmarshal decodes all numbers as float64,
+ // which can be written as 1.2345e4 in JSON,
+ // but this form is not accepted for ints in
+ // url params. If a number fits in an int64,
+ // encode it as int64 rather than float64.
+ if v, frac := math.Modf(v); frac == 0 && v <= math.MaxInt64 && v >= math.MinInt64 {
+ urlValues.Set(k, fmt.Sprintf("%d", int64(v)))
+ continue
+ }
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ urlValues.Set(k, string(j))
+ }
+ return urlValues, nil
+}
+
// RequestAndDecode performs an API request and unmarshals the
// response (which must be JSON) into dst. Method and body arguments
// are the same as for http.NewRequest(). The given path is added to
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
urlString := c.apiURL(path)
- var urlValues url.Values
- if v, ok := params.(url.Values); ok {
- urlValues = v
- } else if params != nil {
- // Convert an arbitrary struct to url.Values. For
- // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
- // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
- //
- // TODO: Do this more efficiently, possibly using
- // json.Decode/Encode, so the whole thing doesn't have
- // to get encoded, decoded, and re-encoded.
- j, err := json.Marshal(params)
- if err != nil {
- return err
- }
- var generic map[string]interface{}
- err = json.Unmarshal(j, &generic)
- if err != nil {
- return err
- }
- urlValues = url.Values{}
- for k, v := range generic {
- if v, ok := v.(string); ok {
- urlValues.Set(k, v)
- continue
- }
- j, err := json.Marshal(v)
- if err != nil {
- return err
- }
- urlValues.Set(k, string(j))
- }
+ urlValues, err := anythingToValues(params)
+ if err != nil {
+ return err
}
if (method == "GET" || body != nil) && urlValues != nil {
// FIXME: what if params don't fit in URL
"fmt"
"io/ioutil"
"net/http"
+ "net/url"
"sync"
"testing"
)
t.Errorf("got nil error, expected something awful")
}
}
+
+func TestAnythingToValues(t *testing.T) {
+ type testCase struct {
+ in interface{}
+ // ok==nil means anythingToValues should return an
+ // error, otherwise it's a func that returns true if
+ // out is correct
+ ok func(out url.Values) bool
+ }
+ for _, tc := range []testCase{
+ {
+ in: map[string]interface{}{"foo": "bar"},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "bar"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": 2147483647},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "2147483647"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": 1.234},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "1.234"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": "1.234"},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "1.234"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": map[string]interface{}{"bar":1.234}},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == `{"bar":1.234}`
+ },
+ },
+ {
+ in: url.Values{"foo": {"bar"}},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "bar"
+ },
+ },
+ {
+ in: 1234,
+ ok: nil,
+ },
+ {
+ in: []string{"foo"},
+ ok: nil,
+ },
+ } {
+ t.Logf("%#v", tc.in)
+ out, err := anythingToValues(tc.in)
+ switch {
+ case tc.ok == nil:
+ if err == nil {
+ t.Errorf("got %#v, expected error", out)
+ }
+ case err != nil:
+ t.Errorf("got err %#v, expected nil", err)
+ case !tc.ok(out):
+ t.Errorf("got %#v but tc.ok() says that is wrong", out)
+ }
+ }
+}
--- /dev/null
+package arvados
+
+// Container is an arvados#container resource.
+type Container struct {
+ UUID string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+}
+
+// Mount is special behavior to attach to a filesystem path or device.
+type Mount struct {
+ Kind string `json:"kind"`
+ Writable bool `json:"writable"`
+ PortableDataHash string `json:"portable_data_hash"`
+ UUID string `json:"uuid"`
+ DeviceType string `json:"device_type"`
+ Path string `json:"path"`
+}
+
+// RuntimeConstraints specify a container's compute resources (RAM,
+// CPU) and network connectivity.
+type RuntimeConstraints struct {
+ API *bool
+ RAM int `json:"ram"`
+ VCPUs int `json:"vcpus"`
+}
+
+// ContainerList is an arvados#containerList resource.
+type ContainerList struct {
+ Items []Container `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// ContainerState is a string corresponding to a valid Container state.
+type ContainerState string
+
+const (
+ ContainerStateQueued = ContainerState("Queued")
+ ContainerStateLocked = ContainerState("Locked")
+ ContainerStateRunning = ContainerState("Running")
+ ContainerStateComplete = ContainerState("Complete")
+ ContainerStateCancelled = ContainerState("Cancelled")
+)
--- /dev/null
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strings"
+)
+
+type TransactionError struct {
+ Method string
+ URL url.URL
+ StatusCode int
+ Status string
+ errors []string
+}
+
+func (e TransactionError) Error() (s string) {
+ s = fmt.Sprintf("request failed: %s", e.URL)
+ if e.Status != "" {
+ s = s + ": " + e.Status
+ }
+ if len(e.errors) > 0 {
+ s = s + ": " + strings.Join(e.errors, "; ")
+ }
+ return
+}
+
+func newTransactionError(req *http.Request, resp *http.Response, buf []byte) *TransactionError {
+ var e TransactionError
+ if json.Unmarshal(buf, &e) != nil {
+ // No JSON-formatted error response
+ e.errors = nil
+ }
+ e.Method = req.Method
+ e.URL = *req.URL
+ if resp != nil {
+ e.Status = resp.Status
+ e.StatusCode = resp.StatusCode
+ }
+ return &e
+}
package dispatch
import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
"os"
"time"
)
-// Constants for container states
const (
- Queued = "Queued"
- Locked = "Locked"
- Running = "Running"
- Complete = "Complete"
- Cancelled = "Cancelled"
+ Queued = arvados.ContainerStateQueued
+ Locked = arvados.ContainerStateLocked
+ Running = arvados.ContainerStateRunning
+ Complete = arvados.ContainerStateComplete
+ Cancelled = arvados.ContainerStateCancelled
)
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
-
-// Represents an Arvados container record
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
- ItemsAvailable int `json:"items_available"`
-}
-
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
// handled by this dispatcher and the goroutine should terminate. The
// goroutine is responsible for draining the 'status' channel, failure
// to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, Container, chan Container)
+ RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
// Amount of time to wait between polling for updates.
PollInterval time.Duration
DoneProcessing chan struct{}
mineMutex sync.Mutex
- mineMap map[string]chan Container
- Auth apiClientAuthorization
- containers chan Container
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
+ containers chan arvados.Container
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
// for which this process is actively starting/monitoring. Returns channel to
// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
if ch, ok := dispatcher.mineMap[uuid]; ok {
return ch
}
- ch := make(chan Container)
+ ch := make(chan arvados.Container)
dispatcher.mineMap[uuid] = ch
return ch
}
}
}
-// checkMine returns true/false if there is a channel for updates associated
+// checkMine returns true if there is a channel for updates associated
// with container c. If update is true, also send the container record on
// the channel.
-func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
ch, ok := dispatcher.mineMap[c.UUID]
}
func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers ContainerList
+ var containers arvados.ContainerList
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
}
}
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
if container.State == Queued && dispatcher.checkMine(container, false) {
// If we previously started the job, something failed, and it
// was re-queued, this dispatcher might still be monitoring it.
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
err := dispatcher.Arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": newState}},
return
}
- dispatcher.mineMap = make(map[string]chan Container)
- dispatcher.containers = make(chan Container)
+ dispatcher.mineMap = make(map[string]chan arvados.Container)
+ dispatcher.containers = make(chan arvados.Container)
// Graceful shutdown on signal
sigChan := make(chan os.Signal)
parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+ parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.")
group = parser.add_mutually_exclusive_group()
group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
else:
last_log_id = None
+ if args.id:
+ last_log_id = args.id-1
+
def on_message(ev):
global filters
global ws
'acbd18db4cc2f85cedef654fccc4a4d8+3',
Exception('mock err'), 200, 200) as req_mock:
self.keep_client.put('foo', num_retries=1, copies=2)
- self.assertTrue(3, req_mock.call_count)
+ self.assertEqual(3, req_mock.call_count)
def test_success_after_retryable_error(self):
with tutil.mock_keep_responses(
'acbd18db4cc2f85cedef654fccc4a4d8+3',
500, 200, 200) as req_mock:
self.keep_client.put('foo', num_retries=1, copies=2)
- self.assertTrue(3, req_mock.call_count)
+ self.assertEqual(3, req_mock.call_count)
def test_fail_after_final_error(self):
# First retry loop gets a 200 (can't achieve replication by
200, 400, 200) as req_mock:
with self.assertRaises(arvados.errors.KeepWriteError):
self.keep_client.put('foo', num_retries=1, copies=2)
- self.assertTrue(2, req_mock.call_count)
+ self.assertEqual(2, req_mock.call_count)
errors.add :priority, "cannot be nil"
end
- # Can update priority, container count.
- permitted.push :priority, :container_count_max, :container_uuid
+ # Can update priority, container count, name and description
+ permitted.push :priority, :container_count_max, :container_uuid, :name, :description
if self.state_changed?
# Allow create-and-commit in a single operation.
end
when Final
- if not current_user.andand.is_admin
+ if not current_user.andand.is_admin and not (self.name_changed? || self.description_changed?)
errors.add :state, "of container request can only be set to Final by system."
end
- if self.state_changed?
- permitted.push :state
+ if self.state_changed? || self.name_changed? || self.description_changed?
+ permitted.push :state, :name, :description
else
errors.add :state, "does not allow updates"
end
require 'faye/websocket'
require 'record_filters'
require 'load_param'
+require 'set'
# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
module Faye
attr_accessor :user
attr_accessor :last_log_id
attr_accessor :filters
+ attr_accessor :sent_ids
+ attr_accessor :notify_queue
end
end
# push_events call if there are more log rows to send.
def push_events ws, notify_id
begin
- if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
- # This notify is for a row we've handled already.
- return
- end
-
# Must have at least one filter set up to receive events
if ws.filters.length > 0
# Start with log rows readable by user, sorted in ascending order
cond_out = []
param_out = []
- if !ws.last_log_id.nil?
- # Client is only interested in log rows that are newer than the
- # last log row seen by the client.
+ if not notify_id.nil?
+ ws.notify_queue.unshift notify_id
+ end
+
+ if not ws.last_log_id.nil?
+ # We are catching up from some starting point.
cond_id = "logs.id > ?"
param_out << ws.last_log_id
- elsif !notify_id.nil?
- # No last log id, so look at rows starting with notify id
- cond_id = "logs.id >= ?"
- param_out << notify_id
+ elsif ws.notify_queue.length > 0
+ # Get next row being notified.
+ cond_id = "logs.id = ?"
+ param_out << ws.notify_queue.pop
else
# No log id to start from, nothing to do, return
return
count = 0
limit = 10
+ lastid = nil
logs.limit(limit).each do |l|
- ws.send(l.as_api_response.to_json)
- ws.last_log_id = l.id
+ if not ws.sent_ids.include?(l.id)
+ # only send if not a duplicate
+ ws.send(l.as_api_response.to_json)
+ end
+ if not ws.last_log_id.nil?
+ # record ids only when sending "catchup" messages, not notifies
+ ws.sent_ids << l.id
+ end
+ lastid = l.id
count += 1
end
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
+ ws.last_log_id = lastid
+ EventMachine::next_tick do
+ push_events ws, nil
+ end
+ elsif !ws.last_log_id.nil?
+ # Done catching up
+ ws.last_log_id = nil
+ end
+
+ if ws.notify_queue.length > 0
EventMachine::next_tick do
push_events ws, nil
end
- elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
- # Number of rows returned was less than cap, but the notify id is
- # higher than the last id visible to the client, so update last_log_id
- ws.last_log_id = notify_id
end
- elsif !notify_id.nil?
- # No filters set up, so just record the sequence number
- ws.last_log_id = notify_id
end
rescue ArgumentError => e
# There was some kind of user error.
# Set or reset the last_log_id. The event bus only reports events
# for rows that come after last_log_id.
ws.last_log_id = p[:last_log_id].to_i
+ # Reset sent_ids for consistency
+ # (always re-deliver all matching messages following last_log_id)
+ ws.sent_ids = Set.new
end
if ws.filters.length < MAX_FILTERS
ws.user = current_user
ws.filters = []
ws.last_log_id = nil
+ ws.sent_ids = Set.new
+ ws.notify_queue = Array.new
# Subscribe to internal postgres notifications through @channel. This will
# call push_events when a notification comes through.
completed:
uuid: zzzzz-xvhdp-cr4completedctr
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- name: completed
+ name: completed container request
state: Final
priority: 1
- created_at: 2016-01-11 11:11:11.111111111 Z
- updated_at: 2016-01-11 11:11:11.111111111 Z
- modified_at: 2016-01-11 11:11:11.111111111 Z
+ created_at: <%= 2.minute.ago.to_s(:db) %>
+ updated_at: <%= 1.minute.ago.to_s(:db) %>
+ modified_at: <%= 1.minute.ago.to_s(:db) %>
modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
container_image: test
cwd: test
command: ["echo", "hello"]
container_uuid: zzzzz-dz642-requestercntnr1
requesting_container_uuid: zzzzz-dz642-requestingcntnr
+
+cr_for_requester2:
+ uuid: zzzzz-xvhdp-cr4requestercn2
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ name: requester_cr2
+ state: Final
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ modified_at: 2016-01-11 11:11:11.111111111 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ container_image: test
+ cwd: test
+ output_path: test
+ command: ["echo", "hello"]
+ requesting_container_uuid: zzzzz-dz642-requestercntnr1
runtime_constraints:
ram: 12000000000
vcpus: 4
+ auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
running-older:
uuid: zzzzz-dz642-runningcontain2
finished_at: 2016-01-12 11:12:13.111111111 Z
container_image: test
cwd: test
- output: test
+ output: zzzzz-4zz18-znfnqtbbv4spc3w
output_path: test
command: ["echo", "hello"]
runtime_constraints:
state: Complete
uuid: zzzzz-d1hrv-i3e77t9z5y8j9cc
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: <%= 11.minute.ago.to_s(:db) %>
started_at: <%= 10.minute.ago.to_s(:db) %>
finished_at: <%= 9.minute.ago.to_s(:db) %>
components:
name: Pipeline in publicly accessible project
pipeline_template_uuid: zzzzz-p5p6p-tmpltpublicproj
state: Complete
- created_at: <%= 1.minute.ago.to_s(:db) %>
+ created_at: <%= 30.minute.ago.to_s(:db) %>
components:
foo:
script: foo
uuid: zzzzz-d1hrv-partdonepipelin
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
state: RunningOnServer
+ created_at: <%= 15.minute.ago.to_s(:db) %>
components:
previous:
job:
uuid: zzzzz-d1hrv-twodonepipeline
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
state: Complete
- started_at: <%= 10.minute.ago.to_s(:db) %>
- finished_at: <%= 9.minute.ago.to_s(:db) %>
+ created_at: <%= 3.minute.ago.to_s(:db) %>
+ started_at: <%= 2.minute.ago.to_s(:db) %>
+ finished_at: <%= 1.minute.ago.to_s(:db) %>
components:
ancient:
job:
failed_pipeline_with_two_jobs:
uuid: zzzzz-d1hrv-twofailpipeline
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: <%= 55.minute.ago.to_s(:db) %>
state: Failed
components:
ancient:
end
[
- ['active', 'zzzzz-dz642-requestercntnr1'],
+ ['active', 'zzzzz-dz642-runningcontainr'],
['active_no_prefs', nil],
].each do |token, expected|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
import (
"flag"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"log"
return nil
}
-func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+func startFunc(container arvados.Container, cmd *exec.Cmd) error {
return cmd.Start()
}
// If the container is in any other state, or is not Complete/Cancelled after
// crunch-run terminates, mark the container as Cancelled.
func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
uuid := container.UUID
import (
"bytes"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
- PollInterval: time.Duration(1) * time.Second,
+ PollInterval: time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
DoneProcessing: doneProcessing}
- startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers dispatch.ContainerList
+ var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container dispatch.Container
+ var container arvados.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- c.Check(container.State, Equals, "Complete")
+ c.Check(string(container.State), Equals, "Complete")
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
DoneProcessing: doneProcessing}
- startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
import (
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
}
// sbatchCmd
-func sbatchFunc(container dispatch.Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+func sbatchFunc(container arvados.Container) *exec.Cmd {
+ memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
return exec.Command("sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
fmt.Sprintf("--priority=%d", container.Priority))
}
// scancelCmd
-func scancelFunc(container dispatch.Container) *exec.Cmd {
+func scancelFunc(container arvados.Container) *exec.Cmd {
return exec.Command("scancel", "--name="+container.UUID)
}
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
- container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
+ container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
//
// If the container is marked as Running, check if it is in the slurm queue.
// If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
if squeueUpdater.CheckSqueue(container.UUID) {
// release it back to the Queue, if it is Running then
// clean up the record.
- var con dispatch.Container
+ var con arvados.Container
err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- var st string
+ var st arvados.ContainerState
switch con.State {
case dispatch.Locked:
st = dispatch.Queued
// Monitor status updates. If the priority changes to zero, cancel the
// container using scancel.
func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
log.Printf("Monitoring container %v started", container.UUID)
defer log.Printf("Monitoring container %v finished", container.UUID)
import (
"bytes"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
func (s *TestSuite) TestIntegrationNormal(c *C) {
container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
- c.Check(container.State, Equals, "Complete")
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
func (s *TestSuite) TestIntegrationCancel(c *C) {
// Override sbatchCmd
var scancelCmdLine []string
- defer func(orig func(dispatch.Container) *exec.Cmd) {
+ defer func(orig func(arvados.Container) *exec.Cmd) {
scancelCmd = orig
}(scancelCmd)
- scancelCmd = func(container dispatch.Container) *exec.Cmd {
+ scancelCmd = func(container arvados.Container) *exec.Cmd {
scancelCmdLine = scancelFunc(container).Args
return exec.Command("echo")
}
container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(1 * time.Second)
dispatcher.Arv.Update("containers", container.UUID,
"container": arvadosclient.Dict{"priority": 0}},
nil)
})
- c.Check(container.State, Equals, "Cancelled")
+ c.Check(container.State, Equals, arvados.ContainerStateCancelled)
c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
}
fmt.Sprintf("--mem-per-cpu=%d", 2862),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--priority=%d", 1)},
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
- c.Check(container.State, Equals, "Cancelled")
+ c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
func (s *TestSuite) integrationTest(c *C,
newSqueueCmd func() *exec.Cmd,
sbatchCmdComps []string,
- runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
+ runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
var sbatchCmdLine []string
// Override sbatchCmd
- defer func(orig func(dispatch.Container) *exec.Cmd) {
+ defer func(orig func(arvados.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container dispatch.Container) *exec.Cmd {
+ sbatchCmd = func(container arvados.Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
return exec.Command("sh")
}
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers dispatch.ContainerList
+ var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
go runContainer(dispatcher, container)
run(dispatcher, container, status)
doneProcessing <- struct{}{}
c.Check(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container dispatch.Container
+ var container arvados.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
return container
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
go func() {
time.Sleep(1 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Running)
"errors"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
}
-// Mount describes the mount points to create inside the container.
-type Mount struct {
- Kind string `json:"kind"`
- Writable bool `json:"writable"`
- PortableDataHash string `json:"portable_data_hash"`
- UUID string `json:"uuid"`
- DeviceType string `json:"device_type"`
- Path string `json:"path"`
-}
-
-// Collection record returned by the API server.
-type CollectionRecord struct {
- ManifestText string `json:"manifest_text"`
- PortableDataHash string `json:"portable_data_hash"`
-}
-
-type RuntimeConstraints struct {
- API *bool
-}
-
-// ContainerRecord is the container record returned by the API server.
-type ContainerRecord struct {
- UUID string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- Mounts map[string]Mount `json:"mounts"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State string `json:"state"`
- Output string `json:"output"`
-}
-
-// APIClientAuthorization is an arvados#api_client_authorization resource.
-type APIClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
Docker ThinDockerClient
ArvClient IArvadosClient
Kc IKeepClient
- ContainerRecord
+ arvados.Container
dockerclient.ContainerConfig
dockerclient.HostConfig
token string
// the image from Keep.
func (runner *ContainerRunner) LoadImage() (err error) {
- runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
+ runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
- var collection CollectionRecord
- err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
+ var collection arvados.Collection
+ err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
collectionPaths := []string{}
runner.Binds = nil
- for bind, mnt := range runner.ContainerRecord.Mounts {
+ for bind, mnt := range runner.Container.Mounts {
if bind == "stdout" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
}
// Does path start with OutputPath?
- prefix := runner.ContainerRecord.OutputPath
+ prefix := runner.Container.OutputPath
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
tmpcount += 1
}
if mnt.Writable {
- if bind == runner.ContainerRecord.OutputPath {
+ if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
}
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
}
collectionPaths = append(collectionPaths, src)
} else if mnt.Kind == "tmp" {
- if bind == runner.ContainerRecord.OutputPath {
+ if bind == runner.Container.OutputPath {
runner.HostOutputDir, err = runner.MkTempDir("", "")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
runner.loggingDone = make(chan bool)
- if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+ if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
+ stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
index := strings.LastIndex(stdoutPath, "/")
if index > 0 {
subdirs := stdoutPath[:index]
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
- runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
- if runner.ContainerRecord.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
+ runner.ContainerConfig.Cmd = runner.Container.Command
+ if runner.Container.Cwd != "." {
+ runner.ContainerConfig.WorkingDir = runner.Container.Cwd
}
- for k, v := range runner.ContainerRecord.Environment {
+ for k, v := range runner.Container.Environment {
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
- if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
tok, err := runner.ContainerToken()
if err != nil {
return err
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
+ runner.ContainerConfig.NetworkDisabled = false
+ } else {
+ runner.ContainerConfig.NetworkDisabled = true
}
- runner.ContainerConfig.NetworkDisabled = true
-
var err error
runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
if err != nil {
}
defer file.Close()
- rec := CollectionRecord{}
+ var rec arvados.Collection
err = json.NewDecoder(file).Decode(&rec)
if err != nil {
return fmt.Errorf("While reading FUSE metafile: %v", err)
manifestText = rec.ManifestText
}
- var response CollectionRecord
+ var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
"crunch-run", nil})
if runner.LogsPDH != nil {
return fmt.Errorf("While creating log manifest: %v", err)
}
- var response CollectionRecord
+ var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
- "name": "logs for " + runner.ContainerRecord.UUID,
+ "name": "logs for " + runner.Container.UUID,
"manifest_text": mt}},
&response)
if err != nil {
return nil
}
-// UpdateContainerRecordRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
+// UpdateContainerRunning updates the container state to "Running"
+func (runner *ContainerRunner) UpdateContainerRunning() error {
runner.CancelLock.Lock()
defer runner.CancelLock.Unlock()
if runner.Cancelled {
return ErrCancelled
}
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
+ return runner.ArvClient.Update("containers", runner.Container.UUID,
arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
return runner.token, nil
}
- var auth APIClientAuthorization
- err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
+ var auth arvados.APIClientAuthorization
+ err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
if err != nil {
return "", err
}
return runner.token, nil
}
-// UpdateContainerRecordComplete updates the container record state on API
+// UpdateContainerComplete updates the container record state on API
// server to "Complete" or "Cancelled"
-func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
+func (runner *ContainerRunner) UpdateContainerFinal() error {
update := arvadosclient.Dict{}
update["state"] = runner.finalState
if runner.finalState == "Complete" {
update["output"] = *runner.OutputPDH
}
}
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
+ return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
- runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+ runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
if hosterr != nil {
checkErr(err)
if runner.finalState == "Queued" {
- runner.UpdateContainerRecordFinal()
+ runner.UpdateContainerFinal()
return
}
checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
- checkErr(runner.UpdateContainerRecordFinal())
+ checkErr(runner.UpdateContainerFinal())
// The real log is already closed, but then we opened
// a new one in case we needed to log anything while
runner.CrunchLog.Close()
}()
- err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
+ err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
if err != nil {
err = fmt.Errorf("While getting container record: %v", err)
return
return
}
- err = runner.UpdateContainerRecordRunning()
+ err = runner.UpdateContainerRunning()
if err != nil {
return
}
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
- cr.ContainerRecord.UUID = containerUUID
+ cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
return cr
"encoding/json"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
Total int64
Calls int
Content []arvadosclient.Dict
- ContainerRecord
+ arvados.Container
Logs map[string]*bytes.Buffer
WasSetRunning bool
sync.Mutex
return nil, nil
}
-func (this *ArvTestClient) Create(resourceType string,
+func (client *ArvTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
- this.Mutex.Lock()
- defer this.Mutex.Unlock()
+ client.Mutex.Lock()
+ defer client.Mutex.Unlock()
- this.Calls += 1
- this.Content = append(this.Content, parameters)
+ client.Calls += 1
+ client.Content = append(client.Content, parameters)
if resourceType == "logs" {
et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
- if this.Logs == nil {
- this.Logs = make(map[string]*bytes.Buffer)
+ if client.Logs == nil {
+ client.Logs = make(map[string]*bytes.Buffer)
}
- if this.Logs[et] == nil {
- this.Logs[et] = &bytes.Buffer{}
+ if client.Logs[et] == nil {
+ client.Logs[et] = &bytes.Buffer{}
}
- this.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
+ client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
}
if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
- outmap := output.(*CollectionRecord)
+ outmap := output.(*arvados.Collection)
outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
}
return nil
}
-func (this *ArvTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+func (client *ArvTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
switch {
case method == "GET" && resourceType == "containers" && action == "auth":
return json.Unmarshal([]byte(`{
}
}
-func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (client *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
if resourceType == "collections" {
if uuid == hwPDH {
- output.(*CollectionRecord).ManifestText = hwManifest
+ output.(*arvados.Collection).ManifestText = hwManifest
} else if uuid == otherPDH {
- output.(*CollectionRecord).ManifestText = otherManifest
+ output.(*arvados.Collection).ManifestText = otherManifest
}
}
if resourceType == "containers" {
- (*output.(*ContainerRecord)) = this.ContainerRecord
+ (*output.(*arvados.Container)) = client.Container
}
return nil
}
-func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
- this.Mutex.Lock()
- defer this.Mutex.Unlock()
- this.Calls += 1
- this.Content = append(this.Content, parameters)
+func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ client.Mutex.Lock()
+ defer client.Mutex.Unlock()
+ client.Calls += 1
+ client.Content = append(client.Content, parameters)
if resourceType == "containers" {
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
- this.WasSetRunning = true
+ client.WasSetRunning = true
}
}
return nil
// parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
// "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
// no call matches, it returns nil.
-func (this *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
- call: for _, content := range this.Content {
+func (client *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
+call:
+ for _, content := range client.Content {
var v interface{} = content
for _, k := range strings.Split(jpath, ".") {
if dict, ok := v.(arvadosclient.Dict); !ok {
return nil
}
-func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- this.Content = buf
+func (client *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ client.Content = buf
return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
}
len uint64
}
-func (this FileWrapper) Len() uint64 {
- return this.len
+func (fw FileWrapper) Len() uint64 {
+ return fw.len
}
-func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
if filename == hwImageId+".tar" {
rdr := ioutil.NopCloser(&bytes.Buffer{})
- this.Called = true
+ client.Called = true
return FileWrapper{rdr, 1321984}, nil
}
return nil, nil
_, err = cr.Docker.InspectImage(hwImageId)
c.Check(err, NotNil)
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
// (1) Test loading image from keep
c.Check(kc.Called, Equals, false)
}
type ArvErrorTestClient struct{}
-type KeepErrorTestClient struct{}
-type KeepReadErrorTestClient struct{}
-func (this ArvErrorTestClient) Create(resourceType string,
+func (ArvErrorTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
return nil
}
-func (this ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (this ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (this ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+func (ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
return nil
}
-func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+type KeepErrorTestClient struct{}
+
+func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, errors.New("KeepError")
}
-func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
return nil, errors.New("KeepError")
}
-func (this KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+type KeepReadErrorTestClient struct{}
+
+func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, nil
}
type ErrorReader struct{}
-func (this ErrorReader) Read(p []byte) (n int, err error) {
+func (ErrorReader) Read(p []byte) (n int, err error) {
return 0, errors.New("ErrorReader")
}
-func (this ErrorReader) Close() error {
+func (ErrorReader) Close() error {
return nil
}
-func (this ErrorReader) Len() uint64 {
+func (ErrorReader) Len() uint64 {
return 0
}
-func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
return ErrorReader{}, nil
}
func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
// (2) Keep error
docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = otherPDH
+ cr.Container.ContainerImage = otherPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
// (4) Collection doesn't contain image
docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err, NotNil)
bytes.Buffer
}
+func (*ClosableBuffer) Close() error {
+ return nil
+}
+
type TestLogs struct {
Stdout ClosableBuffer
Stderr ClosableBuffer
}
-func (this *ClosableBuffer) Close() error {
- return nil
-}
-
-func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+func (tl *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
if logstr == "stdout" {
- return &this.Stdout
+ return &tl.Stdout
}
if logstr == "stderr" {
- return &this.Stderr
+ return &tl.Stderr
}
return nil
}
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
- cr.ContainerRecord.ContainerImage = hwPDH
- cr.ContainerRecord.Command = []string{"./hw"}
+ cr.Container.ContainerImage = hwPDH
+ cr.Container.Command = []string{"./hw"}
err := cr.LoadImage()
c.Check(err, IsNil)
c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
}
-func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
+func (s *TestSuite) TestUpdateContainerRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- err := cr.UpdateContainerRecordRunning()
+ err := cr.UpdateContainerRunning()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
}
-func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
+func (s *TestSuite) TestUpdateContainerComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
*cr.ExitCode = 42
cr.finalState = "Complete"
- err := cr.UpdateContainerRecordFinal()
+ err := cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
}
-func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
+func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Cancelled = true
cr.finalState = "Cancelled"
- err := cr.UpdateContainerRecordFinal()
+ err := cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
docker.fn = fn
docker.RemoveImage(hwImageId, true)
- api = &ArvTestClient{ContainerRecord: rec}
+ api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
"runtime_constraints": {}
}`
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
}
docker.RemoveImage(hwImageId, true)
- api := &ArvTestClient{ContainerRecord: rec}
+ api := &ArvTestClient{Container: rec}
cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
}
{
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
cr.OutputPath = "/tmp"
err := cr.SetupMounts()
{
i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/keeptmp"] = arvados.Mount{Kind: "collection", Writable: true}
cr.OutputPath = "/keeptmp"
os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
{
i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
- cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/keepinp"] = arvados.Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+ cr.Container.Mounts["/keepout"] = arvados.Mount{Kind: "collection", Writable: true}
cr.OutputPath = "/keepout"
os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
// Used by the TestStdoutWithWrongPath*()
func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
docker.fn = fn
docker.RemoveImage(hwImageId, true)
- api = &ArvTestClient{ContainerRecord: rec}
+ api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
return
}
}
- if err = bal.GetCurrentState(&config.Client); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
- // finish processing. (1000 happens to match the page size
- // used by (*arvados.Client)EachCollection(), but it's OK if
- // they don't match.)
- collQ := make(chan arvados.Collection, 1000)
+ // finish processing.
+ collQ := make(chan arvados.Collection, bufs)
// Start a goroutine to process collections. (We could use a
// worker pool here, but even with a single worker we already
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c,
+ err = EachCollection(c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+//
+// If pageSize > 0 it is used as the maximum page size in each API
+// call; otherwise the maximum allowed page size is requested.
+func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
return err
}
- limit := 1000
+ limit := pageSize
+ if limit <= 0 {
+ // Use the maximum page size the server allows
+ limit = 1<<31 - 1
+ }
params := arvados.ResourceListParams{
Limit: &limit,
Order: "modified_at, uuid",
// How often to check
RunPeriod arvados.Duration
+
+ // Number of collections to request in each API call
+ CollectionBatchSize int
+
+ // Max collections to buffer in memory (bigger values consume
+ // more memory, but can reduce store-and-forward latency when
+ // fetching pages)
+ CollectionBuffers int
}
// RunOptions controls runtime behavior. The flags/options that belong
"KeepServiceTypes": [
"disk"
],
- "RunPeriod": "600s"
+ "RunPeriod": "600s",
+ "CollectionBatchSize": 100000,
+ "CollectionBuffers": 1000
}`)
func usage() {
Use the -commit-pull and -commit-trash flags to implement the
computed changes.
+Tuning resource usage:
+
+ CollectionBatchSize limits the number of collections retrieved per
+ API transaction. If this is zero or omitted, page size is
+ determined by the API server's own page size limits (see
+ max_items_per_response and max_index_database_read configs).
+
+ CollectionBuffers sets the size of an internal queue of
+ collections. Higher values use more memory, and improve throughput
+ by allowing keep-balance to fetch the next page of collections
+ while the current page is still being processed. If this is zero
+ or omitted, pages are processed serially.
+
Limitations:
keep-balance does not attempt to discover whether committed pull
from operator import attrgetter
import libcloud.common.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import NETWORK_ERRORS
# libcloud compute drivers typically raise bare Exceptions to
# represent API errors. Return True for any exception that is
# exactly an Exception, or a better-known higher-level exception.
+ if (exception is BaseHTTPError and
+ self.message and self.message.startswith("InvalidInstanceID.NotFound")):
+ return True
return (isinstance(exception, cls.CLOUD_ERRORS) or
type(exception) is Exception)
log)
if test -n "$1" ; then
- exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM less --follow-name +GF "/etc/service/$1/log/main/current"
+ exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM less --follow-name -R +GF "/etc/service/$1/log/main/current"
else
exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM tail $(docker exec -ti $ARVBOX_CONTAINER find -L /etc -path '/etc/service/*/log/main/current' -printf " %p")
fi
FROM debian:8
-RUN apt-get update && \
- DEBIAN_FRONTEND=noninteractive apt-get -yq install \
- postgresql-9.4 git gcc runit \
- ruby rake bundler curl libpq-dev \
+RUN apt-get update
+
+RUN DEBIAN_FRONTEND=noninteractive apt-get -yq --no-install-recommends install \
+ postgresql-9.4 git build-essential runit \
+ ruby rake bundler curl libpq-dev ruby-dev \
libcurl4-openssl-dev libssl-dev zlib1g-dev libpcre3-dev \
openssh-server python-setuptools netcat-traditional \
+ python-epydoc graphviz bzip2 less sudo virtualenv
+
+RUN DEBIAN_FRONTEND=noninteractive apt-get -yq --no-install-recommends install \
libpython-dev fuse libfuse-dev python-pip python-yaml \
pkg-config libattr1-dev python-llfuse python-pycurl \
libwww-perl libio-socket-ssl-perl libcrypt-ssleay-perl \
- libjson-perl nginx gitolite3 lsof python-epydoc graphviz \
+ libjson-perl nginx gitolite3 lsof \
apt-transport-https ca-certificates slurm-wlm
RUN cd /usr/local && \
FROM arvados/arvbox-base
RUN apt-get update && \
- DEBIAN_FRONTEND=noninteractive apt-get -yq install \
- python-virtualenv python3-virtualenv linkchecker xvfb iceweasel
+ DEBIAN_FRONTEND=noninteractive apt-get -yq --no-install-recommends install \
+ linkchecker python3-virtualenv python-virtualenv xvfb iceweasel
RUN set -e && \
PJS=phantomjs-1.9.7-linux-x86_64 && \
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=$(cat /var/lib/arvados/superuser_token)
-exec /usr/local/bin/crunch-dispatch-local -crunch-run-command=/usr/local/bin/crunch-run
+exec /usr/local/bin/crunch-dispatch-local -crunch-run-command=/usr/local/bin/crunch-run -poll-interval=3