services/keep/src/github.com
sdk/java/target
*.class
+apps/workbench/vendor/bundle
+services/api/vendor/bundle
+sdk/java/log
border-right: 1px solid #ffffff;
background: #ffffff;
}
+svg text {
+ font-size: 6pt;
+}
\ No newline at end of file
@new_resource_attrs ||= params[model_class.to_s.underscore.singularize]
@new_resource_attrs ||= {}
@new_resource_attrs.reject! { |k,v| k.to_s == 'uuid' }
- @object ||= model_class.new @new_resource_attrs
- @object.save!
- show
+ @object ||= model_class.new @new_resource_attrs, params["options"]
+ if @object.save
+ respond_to do |f|
+ f.json { render json: @object.attributes.merge(href: url_for(@object)) }
+ f.html {
+ redirect_to @object
+ }
+ f.js { render }
+ end
+ else
+ self.render_error status: 422
+ end
end
def destroy
@svg = ProvenanceHelper::create_provenance_graph nodes, "provenance_svg", {
:request => request,
- :all_script_parameters => true,
+ :all_script_parameters => true,
:script_version_nodes => true}
end
end
end
+ def cancel
+ @object.cancel
+ redirect_to @object
+ end
+
def show
generate_provenance([@object])
end
end
def show_pane_list
- %w(Attributes Provenance Metadata JSON API)
+ %w(Status Attributes Provenance Metadata JSON API)
end
end
class ArvadosBase < ActiveRecord::Base
self.abstract_class = true
attr_accessor :attribute_sortkey
+ attr_accessor :create_params
def self.arvados_api_client
ArvadosApiClient.new_or_current
end
end
- def initialize raw_params={}
+ def initialize raw_params={}, create_params={}
super self.class.permit_attribute_params(raw_params)
+ @create_params = create_params
@attribute_sortkey ||= {
'id' => nil,
'name' => '000',
ActionController::Parameters.new(raw_params).permit!
end
- def self.create raw_params={}
- super(permit_attribute_params(raw_params))
+ def self.create raw_params={}, create_params={}
+ x = super(permit_attribute_params(raw_params))
+ x.create_params = create_params
+ x
end
def update_attributes raw_params={}
obdata.delete :uuid
resp = arvados_api_client.api(self.class, '/' + uuid, postdata)
else
+ postdata.merge!(@create_params) if @create_params
resp = arvados_api_client.api(self.class, '', postdata)
end
return false if !resp[:etag] || !resp[:uuid]
def self.creatable?
false
end
+
+ def cancel
+ arvados_api_client.api "jobs/#{self.uuid}/", "cancel", {}
+ end
end
border-width: 1px;
border-color: gray;
position: absolute;
- left: 1px;
+ left: 225px;
right: 1px;
}
path:hover {
--- /dev/null
+
+<div class="pull-right">
+ <% if @object.running %>
+ <%= form_tag "/jobs/#{@object.uuid}/cancel", style: "display:inline; padding-left: 1em" do |f| %>
+ <%= button_tag "Cancel running job", {class: 'btn btn-danger', id: "cancel-job-button"} %>
+ <% end %>
+ <% else %>
+ Re-run job using script version:
+ <%= form_tag '/jobs', style: "display:inline; padding-left: 1em" do |f| %>
+ <% [:script, :script_version, :repository, :output_is_persistent, :supplied_script_version, :nondeterministic].each do |d| %>
+ <%= hidden_field :job, d, :value => @object[d] %>
+ <% end %>
+ <% [:script_parameters, :runtime_constraints].each do |d| %>
+ <%= hidden_field :job, d, :value => JSON.dump(@object[d]) %>
+ <% end %>
+ <%= button_tag "Same as this run", {class: 'btn btn-primary', id: "re-run-same-job-button"} %>
+ <% end %>
+ <% if !@object.supplied_script_version.nil? and !@object.supplied_script_version.empty? and @object.script_version != @object.supplied_script_version%>
+ <%= form_tag '/jobs', style: "display:inline" do |f| %>
+ <% [:script, :repository, :output_is_persistent, :supplied_script_version, :nondeterministic].each do |d| %>
+ <%= hidden_field :job, d, :value => @object[d] %>
+ <% end %>
+ <%= hidden_field :job, :script_version, :value => @object[:supplied_script_version] %>
+ <% [:script_parameters, :runtime_constraints].each do |d| %>
+ <%= hidden_field :job, d, :value => JSON.dump(@object[d]) %>
+ <% end %>
+ <%= button_tag "Latest (#{@object.repository}/#{@object.supplied_script_version})", {class: 'btn btn-primary', id: "re-run-latest-job-button"} %>
+ <% end %>
+ <% end %>
+<% end %>
+</div>
+
+<table class="table pipeline-components-table">
+ <colgroup>
+ <col style="width: 20%" />
+ <col style="width: 24%" />
+ <col style="width: 12%" />
+ <col style="width: 45%" />
+ </colgroup>
+ <thead>
+ <tr><th>
+ script, version
+ </th><th>
+ progress
+ <%# format:'js' here helps browsers avoid using the cached js
+ content in html context (e.g., duplicate tab -> see
+ javascript) %>
+ <%= link_to '(refresh)', {format: :js}, {class: 'refresh hide', remote: true, method: 'get'} %>
+ </th>
+ <th></th>
+ <th>
+ output
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ <%= @object[:script] %><br>
+ <span class="deemphasize"><%= @object[:script_version] %></span>
+ </td>
+ <td>
+ <%= render partial: 'job_progress', locals: {:j => @object} %>
+ <% if @object.running == false %>
+ <% if @object[:job].andand[:uuid] %>
+ <span class="deemphasize">
+ <%= link_to("..."+@object[:job][:uuid].last(15), job_url(id: @object[:job][:uuid])) %>
+ </span>
+
+ <% current_job = @object %>
+ <% if current_job.andand[:log] %>
+ <% fixup = /([a-f0-9]{32}\+\d+)(\+?.*)/.match(current_job[:log])%>
+ <% Collection.limit(1).where(uuid: fixup[1]).each do |c| %>
+ <% c.files.each do |file| %>
+ <br/><span class="deemphasize">
+ <a href="<%= collection_path(current_job[:log]) %>/<%= file[1] %>?disposition=inline&size=<%= file[2] %>">log</a>
+ </span>
+ <% end %>
+ <% end %>
+ <% end %>
+ <% end %>
+ <% end %>
+ </td><td>
+ <%= render(partial: 'job_status_label',
+ locals: { :j => @object }) %>
+ </td><td>
+ <%= link_to_if_arvados_object @object[:output], {:thumbnail => true} %>
+ </td>
+ </tr>
+ <tfoot>
+ <tr><td colspan="5"></td></tr>
+ </tfoot>
+</table>
resources :virtual_machines
resources :authorized_keys
resources :job_tasks
- resources :jobs
+ resources :jobs do
+ post 'cancel', :on => :member
+ end
match '/logout' => 'sessions#destroy', via: [:get, :post]
get '/logged_out' => 'sessions#index'
resources :users do
# Pipeline is stopped. We have the option to resume it.
page.assert_selector 'a,button', text: 'Run'
+
+ # Go over to the graph tab
+ click_link 'Graph'
+ assert page.has_css? 'div#provenance_graph'
end
end
|_. Argument |_. Type |_. Description |_. Location |_. Example |
{background:#ccffcc}.|uuid|string|The UUID of the User in question.|path||
-h2. event_stream
-
-event_stream users
-
Arguments:
table(table table-bordered table-condensed).
"5ee633fe2569d2a42dd81b07490d5d13+82",
"c905c8d8443a9c44274d98b7c6cfaa32+94",
"d237a90bae3870b3b033aea1e99de4a9+10820"
- ],
- "log_stream_href":"https://qr1hi.arvadosapi.com/arvados/v1/jobs/qr1hi-8i9sb-n9k7qyp7bs5b9d4/log_tail_follow"
+ ]
}
-~$ <span class="userinput">arv job log_tail_follow --uuid qr1hi-8i9sb-n9k7qyp7bs5b9d4</span>
-Tue Dec 17 19:02:16 2013 salloc: Granted job allocation 1251
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 check slurm allocation
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 node compute13 - 8 slots
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 start
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 Install revision 76588bfc57f33ea1b36b82ca7187f465b73b4ca4
-Tue Dec 17 19:02:18 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 Clean-work-dir exited 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 Install exited 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 script GATK2-VariantFiltration
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 script_version 76588bfc57f33ea1b36b82ca7187f465b73b4ca4
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 script_parameters {"input":"5ee633fe2569d2a42dd81b07490d5d13+82","gatk_bundle":"d237a90bae3870b3b033aea1e99de4a9+10820","gatk_binary_tarball":"c905c8d8443a9c44274d98b7c6cfaa32+94"}
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 runtime_constraints {"max_tasks_per_node":0}
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 start level 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 0 done, 0 running, 1 todo
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 job_task qr1hi-ot0gb-d3sjxerucfbvyev
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 child 4946 started on compute13.1
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 0 done, 1 running, 0 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 child 4946 on compute13.1 exit 0 signal 0 success=true
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 success in 1 seconds
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 output
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 wait for last 0 children to finish
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 1 done, 0 running, 1 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 start level 1
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 1 done, 0 running, 1 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 job_task qr1hi-ot0gb-w8ujbnulxjaamxf
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 child 4984 started on compute13.1
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 1 done, 1 running, 0 todo
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 child 4984 on compute13.1 exit 0 signal 0 success=true
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 success in 110 seconds
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 output bedd6ff56b3ae9f90d873b1fcb72f9a3+91
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 wait for last 0 children to finish
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 status: 2 done, 0 running, 0 todo
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 release job allocation
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 Freeze not implemented
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 collate
-Tue Dec 17 19:04:10 2013 salloc: Job allocation 1251 has been revoked.
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 output bedd6ff56b3ae9f90d873b1fcb72f9a3+91
-Tue Dec 17 19:04:11 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 finish
-Tue Dec 17 19:04:12 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 log manifest is 1e77aaceee2df499e14dc5dde5c3d328+91
</code></pre>
</notextile>
"tasks_summary":{},
"dependencies":[
"c1bad4b39ca5a924e481008009d94e32+210"
- ],
- "log_stream_href":"https://qr1hi.arvadosapi.com/arvados/v1/jobs/qr1hi-8i9sb-1pm1t02dezhupss/log_tail_follow"
+ ]
}
</code></pre>
</notextile>
Go to the "Workbench dashboard":https://{{site.arvados_workbench_host}} and visit *Activity* %(rarr)→% *Recent jobs*. Your job should be near the top of the table. This table refreshes automatically. When the job has completed successfully, it will show <span class="label label-success">finished</span> in the *Status* column.
-On the command line, you can access log messages while the job runs using @arv job log_tail_follow@:
-
-notextile. <pre><code>~$ <span class="userinput">arv job log_tail_follow --uuid qr1hi-8i9sb-xxxxxxxxxxxxxxx</span></code></pre>
-
-This will print out the last several lines of the log for that job.
-
h2. Inspect the job output
On the "Workbench dashboard":https://{{site.arvados_workbench_host}}, look for the *Output* column of the *Recent jobs* table. Click on the link under *Output* for your job to go to the files page with the job output. The files page lists all the files that were output by the job. Click on the link under the *file* column to view a file, or click on the download icon <span class="glyphicon glyphicon-download-alt"></span> to download the output file.
},
"dependencies":[
"c1bad4b39ca5a924e481008009d94e32+210"
- ],
- "log_stream_href":null
+ ]
}
</code></pre>
</notextile>
# Make sure the secret is at least 30 characters and all random,
# no regular words or you'll be exposed to dictionary attacks.
Server::Application.config.secret_token = '@@API_SECRET@@'
+
+# The blob_signing_key is a string of alphanumeric characters used
+# to sign permission hints for Keep locators. It must be identical
+# to the permission key given to Keep.
+Server::Application.config.blob_signing_key = '@@KEEP_SIGNING_SECRET@@'
rsync -rlp --exclude=docker/ --exclude='**/log/*' --exclude='**/tmp/*' \
--chmod=Da+rx,Fa+rX ../ build/
find build/ -name \*.gem -delete
- cd build/sdk/python/ && ./build.sh
+ cd build/services/fuse/ && python setup.py build
+ cd build/sdk/python/ && python setup.py build
cd build/sdk/cli && gem build arvados-cli.gemspec
cd build/sdk/ruby && gem build arvados.gemspec
touch build/.buildstamp
# will be chosen randomly at build time. This is the
# recommended setting.
+# The signing key shared by Keep at the API server to verify
+# blob permission signatures.
+KEEP_SIGNING_SECRET:
+
# The value for the Rails config.secret_token setting.
API_SECRET:
# Install Arvados packages.
RUN find /usr/src/arvados/sdk -name '*.gem' -print0 | \
xargs -0rn 1 gem install && \
+ cd /usr/src/arvados/services/fuse && \
+ python setup.py install && \
cd /usr/src/arvados/sdk/python && \
python setup.py install
case api_method
when
- 'arvados.users.event_stream',
- 'arvados.jobs.log_stream',
'arvados.jobs.log_tail_follow'
# Special case for methods that respond with data streams rather
$command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_image)
{
- $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
+ $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
+ $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
# Dynamically configure the container to use the host system as its
# DNS server. Get the host's global addresses from the ip command,
# and turn them into docker --dns options using gawk.
}
while (my ($env_key, $env_val) = each %ENV)
{
- $command .= "-e \Q$env_key=$env_val\E ";
+ if ($env_key =~ /^(JOB|TASK)_/) {
+ $command .= "-e \Q$env_key=$env_val\E ";
+ }
}
$command .= "\Q$docker_image\E ";
+ } else {
+ $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
}
$command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
my @execargs = ('bash', '-c', $command);
gem 'omniauth-oauth2', '1.1.1'
gem 'andand'
-gem 'redis'
gem 'test_after_commit', :group => :test
rake (10.2.2)
rdoc (3.12.2)
json (~> 1.4)
- redis (3.0.7)
ref (1.0.5)
rvm-capistrano (1.5.1)
capistrano (~> 2.15.4)
pg
pg_power
rails (~> 3.2.0)
- redis
rvm-capistrano
sass-rails (>= 3.2.0)
simplecov (~> 0.7.1)
@job.reload
end
end
- @redis = Redis.new(:timeout => 0)
- if @redis.exists @job.uuid
- # A log buffer exists. Start by showing the last few KB.
- @redis.
- getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1).
- sub(/^[^\n]*\n?/, '').
- split("\n").
- each do |line|
- yield "#{line}\n"
- end
- end
- # TODO: avoid missing log entries between getrange() above and
- # subscribe() below.
- @redis.subscribe(@job.uuid) do |event|
- event.message do |channel, msg|
- if msg == "end"
- @redis.unsubscribe @job.uuid
- else
- yield "#{msg}\n"
- end
- end
- end
- end
- end
-
- def self._log_tail_follow_requires_parameters
- {
- buffer_size: {type: 'integer', required: false, default: 2**13}
- }
- end
- def log_tail_follow
- if !@object.andand.uuid
- return render_not_found
- end
- if client_accepts_plain_text_stream
- self.response.headers['Last-Modified'] = Time.now.ctime.to_s
- self.response_body = LogStreamer.new @object, {
- buffer_size: (params[:buffer_size].to_i rescue 2**13)
- }
- else
- render json: {
- href: url_for(uuid: @object.uuid),
- comment: ('To retrieve the log stream as plain text, ' +
- 'use a request header like "Accept: text/plain"')
- }
end
end
if !@object
return render_not_found
end
- @object.ping({ ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
- ping_secret: params[:ping_secret],
- ec2_instance_id: params[:instance_id] })
+ ping_data = {
+ ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+ ec2_instance_id: params[:instance_id]
+ }
+ [:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
+ .each do |key|
+ ping_data[key] = params[key] if params[key]
+ end
+ @object.ping(ping_data)
if @object.info['ping_secret'] == params[:ping_secret]
render json: @object.as_api_response(:superuser)
else
accept_attribute_as_json :prefs, Hash
skip_before_filter :find_object_by_uuid, only:
- [:activate, :event_stream, :current, :system, :setup]
+ [:activate, :current, :system, :setup]
skip_before_filter :render_404_if_no_object, only:
- [:activate, :event_stream, :current, :system, :setup]
+ [:activate, :current, :system, :setup]
before_filter :admin_required, only: [:setup, :unsetup]
def current
show
end
- class ChannelStreamer
- Q_UPDATE_INTERVAL = 12
- def initialize(opts={})
- @opts = opts
- end
- def each
- return unless @opts[:channel]
- @redis = Redis.new(:timeout => 0)
- @redis.subscribe(@opts[:channel]) do |event|
- event.message do |channel, msg|
- yield msg + "\n"
- end
- end
- end
- end
-
- def event_stream
- channel = current_user.andand.uuid
- if current_user.andand.is_admin
- channel = params[:uuid] || channel
- end
- if client_accepts_plain_text_stream
- self.response.headers['Last-Modified'] = Time.now.ctime.to_s
- self.response_body = ChannelStreamer.new(channel: channel)
- else
- render json: {
- href: url_for(uuid: channel),
- comment: ('To retrieve the event stream as plain text, ' +
- 'use a request header like "Accept: text/plain"')
- }
- end
- end
-
def activate
if current_user.andand.is_admin && params[:uuid]
@object = User.find params[:uuid]
t.add :runtime_constraints
t.add :tasks_summary
t.add :dependencies
- t.add :log_stream_href
- t.add :log_buffer
t.add :nondeterministic
t.add :repository
+ t.add :supplied_script_version
end
def assert_finished
running: false)
end
- def log_stream_href
- unless self.finished_at
- "#{current_api_base}/#{self.class.to_s.pluralize.underscore}/#{self.uuid}/log_tail_follow"
- end
- end
-
def self.queue
self.where('started_at is ? and is_locked_by_uuid is ? and cancelled_at is ? and success is ?',
nil, nil, nil, nil).
if new_record? or script_version_changed?
sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil
if sha1
+ self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty?
self.script_version = sha1
else
raise ArgumentError.new("Specified script_version does not resolve to a commit")
end
end
end
-
- def log_buffer
- begin
- @@redis ||= Redis.new(:timeout => 0)
- if @@redis.exists uuid
- @@redis.getrange(uuid, 0 - 2**10, -1)
- end
- rescue Redis::CannotConnectError
- return '(not available)'
- end
- end
end
end
end
+ # Record other basic stats
+ ['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
+ if value = (o[key] or o[key.to_sym])
+ self.info[key] = value
+ else
+ self.info.delete(key)
+ end
+ end
+
save!
end
resources :job_tasks
resources :jobs do
get 'queue', on: :collection
- get 'log_tail_follow', on: :member
post 'cancel', on: :member
end
resources :keep_disks do
resources :users do
get 'current', on: :collection
get 'system', on: :collection
- get 'event_stream', on: :member
post 'activate', on: :member
post 'setup', on: :collection
post 'unsetup', on: :member
--- /dev/null
+class AddSuppliedScriptVersion < ActiveRecord::Migration
+ def up
+ add_column :jobs, :supplied_script_version, :string
+ end
+
+ def down
+ remove_column :jobs, :supplied_script_version, :string
+ end
+end
#
# It's strongly recommended to check this file into your version control system.
-ActiveRecord::Schema.define(:version => 20140602143352) do
-
+ActiveRecord::Schema.define(:version => 20140602143352) do
create_table "api_client_authorizations", :force => true do |t|
t.string "api_token", :null => false
t.boolean "nondeterministic"
t.string "repository"
t.boolean "output_is_persistent", :default => false, :null => false
+ t.string "supplied_script_version"
end
add_index "jobs", ["created_at"], :name => "index_jobs_on_created_at"
end
if Server::Application.config.crunch_job_user
- cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user)
- end
-
- cmd_args << "HOME=/dev/null"
- cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}"
- cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE']
-
- ENV.each do |k, v|
- cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_"
- end
-
- if $trollopts.use_env
- cmd_args << "PATH=#{ENV['PATH']}"
- cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}"
- cmd_args << "PERLLIB=#{ENV['PERLLIB']}"
- cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}"
- cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}"
+ cmd_args.unshift("sudo", "-E", "-u",
+ Server::Application.config.crunch_job_user,
+ "PATH=#{ENV['PATH']}",
+ "PERLLIB=#{ENV['PERLLIB']}",
+ "PYTHONPATH=#{ENV['PYTHONPATH']}",
+ "RUBYLIB=#{ENV['RUBYLIB']}",
+ "GEM_PATH=#{ENV['GEM_PATH']}")
end
job_auth = ApiClientAuthorization.
info:
:ping_secret: "69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0"
:slurm_state: "idle"
+ total_cpu_cores: 16
assert_not_nil json_response['info']['ping_secret']
end
+ test "ping adds node stats to info" do
+ node = nodes(:idle)
+ post :ping, {
+ id: node.uuid,
+ ping_secret: node.info['ping_secret'],
+ total_cpu_cores: 32,
+ total_ram_mb: 1024,
+ total_scratch_mb: 2048
+ }
+ assert_response :success
+ info = JSON.parse(@response.body)['info']
+ assert_equal(node.info['ping_secret'], info['ping_secret'])
+ assert_equal(32, info['total_cpu_cores'].to_i)
+ assert_equal(1024, info['total_ram_mb'].to_i)
+ assert_equal(2048, info['total_scratch_mb'].to_i)
+ end
end
require 'test_helper'
class NodeTest < ActiveSupport::TestCase
- # test "the truth" do
- # assert true
- # end
+ def ping_node(node_name, ping_data)
+ set_user_from_auth :admin
+ node = nodes(node_name)
+ node.ping({ping_secret: node.info['ping_secret'],
+ ip: node.ip_address}.merge(ping_data))
+ node
+ end
+
+ test "pinging a node can add and update stats" do
+ node = ping_node(:idle, {total_cpu_cores: '12', total_ram_mb: '512'})
+ assert_equal(12, node.info['total_cpu_cores'].to_i)
+ assert_equal(512, node.info['total_ram_mb'].to_i)
+ end
+
+ test "stats disappear if not in a ping" do
+ node = ping_node(:idle, {total_ram_mb: '256'})
+ refute_includes(node.info, 'total_cpu_cores')
+ assert_equal(256, node.info['total_ram_mb'].to_i)
+ end
end
--- /dev/null
+#! /bin/sh
+
+# Wraps the 'go' executable with some environment setup. Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
+GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
+export GOPATH
+
+mkdir -p $rootdir/pkg
+mkdir -p $rootdir/bin
+
+go $*
--- /dev/null
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "os/signal"
+ "strings"
+ "syscall"
+ "time"
+)
+
+func ReadLineByLine(inp io.ReadCloser, out chan string, finish chan bool) {
+ s := bufio.NewScanner(inp)
+ for s.Scan() {
+ out <- s.Text()
+ }
+ finish <- true
+}
+
+func OutputChannel(stdout chan string, stderr chan string) {
+ for {
+ select {
+ case s, ok := <-stdout:
+ if ok {
+ fmt.Fprintln(os.Stdout, s)
+ } else {
+ return
+ }
+ case s, ok := <-stderr:
+ if ok {
+ fmt.Fprintln(os.Stderr, s)
+ } else {
+ return
+ }
+ }
+ }
+}
+
+func PollCgroupStats(cgroup_path string, stderr chan string, poll int64) {
+ //var last_usage int64 = 0
+ var last_user int64 = 0
+ var last_sys int64 = 0
+ var last_cpucount int64 = 0
+
+ type Disk struct {
+ last_read int64
+ next_read int64
+ last_write int64
+ next_write int64
+ }
+
+ disk := make(map[string]*Disk)
+
+ //cpuacct_usage := fmt.Sprintf("%s/cpuacct.usage", cgroup_path)
+ cpuacct_stat := fmt.Sprintf("%s/cpuacct.stat", cgroup_path)
+ blkio_io_service_bytes := fmt.Sprintf("%s/blkio.io_service_bytes", cgroup_path)
+ cpuset_cpus := fmt.Sprintf("%s/cpuset.cpus", cgroup_path)
+ memory_stat := fmt.Sprintf("%s/memory.stat", cgroup_path)
+
+ var elapsed int64 = poll
+
+ for {
+ /*{
+ c, _ := os.Open(cpuacct_usage)
+ b, _ := ioutil.ReadAll(c)
+ var next int64
+ fmt.Sscanf(string(b), "%d", &next)
+ if last_usage != 0 {
+ stderr <- fmt.Sprintf("crunchstat: cpuacct.usage %v", (next-last_usage)/10000000)
+ }
+ //fmt.Printf("usage %d %d %d %d%%\n", last_usage, next, next-last_usage, (next-last_usage)/10000000)
+ last_usage = next
+ c.Close()
+ }*/
+ var cpus int64 = 0
+ {
+ c, _ := os.Open(cpuset_cpus)
+ b, _ := ioutil.ReadAll(c)
+ sp := strings.Split(string(b), ",")
+ for _, v := range sp {
+ var min, max int64
+ n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+ if n == 2 {
+ cpus += (max - min) + 1
+ } else {
+ cpus += 1
+ }
+ }
+
+ if cpus != last_cpucount {
+ stderr <- fmt.Sprintf("crunchstat: cpuset.cpus %v", cpus)
+ }
+ last_cpucount = cpus
+
+ c.Close()
+ }
+ if cpus == 0 {
+ cpus = 1
+ }
+ {
+ c, _ := os.Open(cpuacct_stat)
+ b, _ := ioutil.ReadAll(c)
+ var next_user int64
+ var next_sys int64
+ fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
+ c.Close()
+
+ if last_user != 0 {
+ user_diff := next_user - last_user
+ sys_diff := next_sys - last_sys
+ // Assume we're reading stats based on 100
+ // jiffies per second. Because the ellaspsed
+ // time is in milliseconds, we need to boost
+ // that to 1000 jiffies per second, then boost
+ // it by another 100x to get a percentage, then
+ // finally divide by the actual elapsed time
+ // and the number of cpus to get average load
+ // over the polling period.
+ user_pct := (user_diff * 10 * 100) / (elapsed * cpus)
+ sys_pct := (sys_diff * 10 * 100) / (elapsed * cpus)
+
+ stderr <- fmt.Sprintf("crunchstat: cpuacct.stat user %v", user_pct)
+ stderr <- fmt.Sprintf("crunchstat: cpuacct.stat sys %v", sys_pct)
+ }
+
+ /*fmt.Printf("user %d %d %d%%\n", last_user, next_user, next_user-last_user)
+ fmt.Printf("sys %d %d %d%%\n", last_sys, next_sys, next_sys-last_sys)
+ fmt.Printf("sum %d%%\n", (next_user-last_user)+(next_sys-last_sys))*/
+ last_user = next_user
+ last_sys = next_sys
+ }
+ {
+ c, _ := os.Open(blkio_io_service_bytes)
+ b := bufio.NewScanner(c)
+ var device, op string
+ var next int64
+ for b.Scan() {
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &next); err == nil {
+ if disk[device] == nil {
+ disk[device] = new(Disk)
+ }
+ if op == "Read" {
+ disk[device].last_read = disk[device].next_read
+ disk[device].next_read = next
+ if disk[device].last_read > 0 {
+ stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
+ }
+ }
+ if op == "Write" {
+ disk[device].last_write = disk[device].next_write
+ disk[device].next_write = next
+ if disk[device].last_write > 0 {
+ stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
+ }
+ }
+ }
+ }
+ c.Close()
+ }
+
+ {
+ c, _ := os.Open(memory_stat)
+ b := bufio.NewScanner(c)
+ var stat string
+ var val int64
+ for b.Scan() {
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err == nil {
+ if stat == "rss" {
+ stderr <- fmt.Sprintf("crunchstat: memory.stat rss %v", val)
+ }
+ }
+ }
+ c.Close()
+ }
+
+ bedtime := time.Now()
+ time.Sleep(time.Duration(poll) * time.Millisecond)
+ morning := time.Now()
+ elapsed = morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
+ }
+}
+
+func main() {
+
+ var (
+ cgroup_path string
+ cgroup_parent string
+ cgroup_cidfile string
+ wait int64
+ poll int64
+ )
+
+ flag.StringVar(&cgroup_path, "cgroup-path", "", "Direct path to cgroup")
+ flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Path to parent cgroup")
+ flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
+ flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
+ flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
+
+ flag.Parse()
+
+ logger := log.New(os.Stderr, "crunchstat: ", 0)
+
+ if cgroup_path == "" && cgroup_cidfile == "" {
+ logger.Fatal("Must provide either -cgroup-path or -cgroup-cid")
+ }
+
+ // Make output channel
+ stdout_chan := make(chan string)
+ stderr_chan := make(chan string)
+ finish_chan := make(chan bool)
+ defer close(stdout_chan)
+ defer close(stderr_chan)
+ defer close(finish_chan)
+
+ go OutputChannel(stdout_chan, stderr_chan)
+
+ var cmd *exec.Cmd
+
+ if len(flag.Args()) > 0 {
+ // Set up subprocess
+ cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
+
+ logger.Print("Running ", flag.Args())
+
+ // Forward SIGINT and SIGTERM to inner process
+ term := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ catch := <-sig
+ if cmd.Process != nil {
+ cmd.Process.Signal(catch)
+ }
+ logger.Print("caught signal:", catch)
+ }(term)
+ signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
+
+ // Funnel stdout and stderr from subprocess to output channels
+ stdout_pipe, err := cmd.StdoutPipe()
+ if err != nil {
+ logger.Fatal(err)
+ }
+ go ReadLineByLine(stdout_pipe, stdout_chan, finish_chan)
+
+ stderr_pipe, err := cmd.StderrPipe()
+ if err != nil {
+ logger.Fatal(err)
+ }
+ go ReadLineByLine(stderr_pipe, stderr_chan, finish_chan)
+
+ // Run subprocess
+ if err := cmd.Start(); err != nil {
+ logger.Fatal(err)
+ }
+ }
+
+ // Read the cid file
+ if cgroup_cidfile != "" {
+ // wait up to 'wait' seconds for the cid file to appear
+ var i time.Duration
+ for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
+ f, err := os.Open(cgroup_cidfile)
+ if err == nil {
+ cid, err2 := ioutil.ReadAll(f)
+ if err2 == nil && len(cid) > 0 {
+ cgroup_path = string(cid)
+ f.Close()
+ break
+ }
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ if cgroup_path == "" {
+ logger.Printf("Could not read cid file %s", cgroup_cidfile)
+ }
+ }
+
+ // add the parent prefix
+ if cgroup_parent != "" {
+ cgroup_path = fmt.Sprintf("%s/%s", cgroup_parent, cgroup_path)
+ }
+
+ logger.Print("Using cgroup ", cgroup_path)
+
+ go PollCgroupStats(cgroup_path, stderr_chan, poll)
+
+ // Wait for each of stdout and stderr to drain
+ <-finish_chan
+ <-finish_chan
+
+ if err := cmd.Wait(); err != nil {
+ if exiterr, ok := err.(*exec.ExitError); ok {
+ // The program has exited with an exit code != 0
+
+ // This works on both Unix and Windows. Although package
+ // syscall is generally platform dependent, WaitStatus is
+ // defined for both Unix and Windows and in both cases has
+ // an ExitStatus() method with the same signature.
+ if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
+ os.Exit(status.ExitStatus())
+ }
+ } else {
+ logger.Fatalf("cmd.Wait: %v", err)
+ }
+ }
+}
import subprocess
import argparse
import daemon
+import signal
if __name__ == '__main__':
# Handle command line parameters
rc = 255
try:
- rc = subprocess.call(args.exec_args, shell=False)
+ sp = subprocess.Popen(args.exec_args, shell=False)
+
+ # forward signals to the process.
+ signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+ # wait for process to complete.
+ rc = sp.wait()
+
+ # restore default signal handlers.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGQUIT, signal.SIG_DFL)
except OSError as e:
sys.stderr.write('arv-mount: %s -- exec %s\n' % (str(e), args.exec_args))
rc = e.errno
"bytes"
"crypto/md5"
"encoding/json"
- "errors"
"flag"
"fmt"
"github.com/gorilla/mux"
return e.ErrMsg
}
-// This error is returned by ReadAtMost if the available
-// data exceeds BLOCKSIZE bytes.
-var ReadErrorTooLong = errors.New("Too long")
-
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
// Read the block data to be stored.
// If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
//
- // Note: because req.Body is a buffered Reader, each Read() call will
- // collect only the data in the network buffer (typically 16384 bytes),
- // even if it is passed a much larger slice.
- //
- // Instead, call ReadAtMost to read data from the socket
- // repeatedly until either EOF or BLOCKSIZE bytes have been read.
- //
- if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
+ if req.ContentLength > BLOCKSIZE {
+ http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
+ return
+ }
+
+ buf := make([]byte, req.ContentLength)
+ nread, err := io.ReadFull(req.Body, buf)
+ if err != nil {
+ http.Error(resp, err.Error(), 500)
+ } else if int64(nread) < req.ContentLength {
+ http.Error(resp, "request truncated", 500)
+ } else {
if err := PutBlock(buf, hash); err == nil {
// Success; add a size hint, sign the locator if
// possible, and return it to the client.
ke := err.(*KeepError)
http.Error(resp, ke.Error(), ke.HTTPCode)
}
- } else {
- log.Println("error reading request: ", err)
- errmsg := err.Error()
- if err == ReadErrorTooLong {
- // Use a more descriptive error message that includes
- // the maximum request size.
- errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
- }
- http.Error(resp, errmsg, 500)
}
+ return
}
// IndexHandler
}
}
-// ReadAtMost
-// Reads bytes repeatedly from an io.Reader until either
-// encountering EOF, or the maxbytes byte limit has been reached.
-// Returns a byte slice of the bytes that were read.
-//
-// If the reader contains more than maxbytes, returns a nil slice
-// and an error.
-//
-func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
- // Attempt to read one more byte than maxbytes.
- lr := io.LimitReader(r, int64(maxbytes+1))
- buf, err := ioutil.ReadAll(lr)
- if len(buf) > maxbytes {
- return nil, ReadErrorTooLong
- }
- return buf, err
-}
-
// IsValidLocator
// Return true if the specified string is a valid Keep locator.
// When Keep is extended to support hash types other than MD5,
// corrupted data block.
//
func (v *UnixVolume) Read(loc string) ([]byte, error) {
- var f *os.File
- var err error
- var buf []byte
-
blockFilename := filepath.Join(v.root, loc[0:3], loc)
-
- f, err = os.Open(blockFilename)
+ buf, err := ioutil.ReadFile(blockFilename)
if err != nil {
- return nil, err
- }
-
- if buf, err = ioutil.ReadAll(f); err != nil {
log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
- return buf, err
+ return nil, err
}
- // Success!
return buf, nil
}
--- /dev/null
+#! /usr/bin/env python
+
+# traffic_test.py
+#
+# Launch a test Keep and API server and PUT and GET a bunch of blocks.
+# Can be used to simulate client traffic in Keep to evaluate memory usage,
+# error logging, performance, etc.
+#
+# This script is warty and is relatively environment-specific, but the
+# example run described below should execute cleanly.
+#
+# Usage:
+# traffic_test.py start
+# Starts the test servers.
+# traffic_test.py put file1 file2 file3 ....
+# Runs arv-put on each file.
+# traffic_test.py get hash1 hash2 hash3 ....
+# Loops forever issuing GET requests for specified blocks.
+# traffic_test.py stop
+# Stops the test servers.
+#
+# Example:
+#
+# $ ./traffic_test.py start
+# $ ./traffic_test.py put GS00253-DNA_A02_200_37.tsv.bz2 \
+# GS00253-DNA_B01_200_37.tsv.bz2 \
+# GS00253-DNA_B02_200_37.tsv.bz2
+# $ ./traffic_test.py get $(find /tmp/tmp* -type f -printf "%f ")
+# [loops forever]
+# ^C
+# $ ./traffic_test.py stop
+#
+# Multiple "get" runs may be run concurrently to evaluate Keep's handling
+# of additional concurrent clients.
+
+PYSDK_DIR = "../../../sdk/python"
+PYTEST_DIR = PYSDK_DIR + "/tests"
+ARV_PUT_PATH = PYSDK_DIR + "/bin/arv-put"
+ARV_GET_PATH = PYSDK_DIR + "/bin/arv-get"
+SECONDS_BETWEEN_GETS = 1
+
+import argparse
+import httplib2
+import os
+import random
+import subprocess
+import sys
+import time
+
+# for run_test_server.py
+sys.path.insert(0, PYSDK_DIR)
+sys.path.insert(0, PYTEST_DIR)
+import arvados
+import run_test_server
+
+def arv_cmd(*args):
+ p = subprocess.Popen([sys.executable] + list(args),
+ stdout=subprocess.PIPE)
+ (arvout, arverr) = p.communicate()
+ if p.returncode != 0:
+ print "error {} from {} {}: {}".format(
+ p.returncode, sys.executable, args, arverr)
+ sys.exit(p.returncode)
+ return arvout
+
+def start():
+ run_test_server.run()
+ run_test_server.run_keep()
+
+def put(files):
+ os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+ run_test_server.authorize_with('active')
+ for v in ["ARVADOS_API_HOST",
+ "ARVADOS_API_HOST_INSECURE",
+ "ARVADOS_API_TOKEN"]:
+ os.environ[v] = arvados.config.settings()[v]
+
+ if not os.environ.has_key('PYTHONPATH'):
+ os.environ['PYTHONPATH'] = ''
+ os.environ['PYTHONPATH'] = "{}:{}:{}".format(
+ PYSDK_DIR, PYTEST_DIR, os.environ['PYTHONPATH'])
+
+ for c in files:
+ manifest_uuid = arv_cmd(ARV_PUT_PATH, c)
+
+def get(blocks):
+ os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+
+ run_test_server.authorize_with('active')
+ for v in ["ARVADOS_API_HOST",
+ "ARVADOS_API_HOST_INSECURE",
+ "ARVADOS_API_TOKEN"]:
+ os.environ[v] = arvados.config.settings()[v]
+
+ nqueries = 0
+ while True:
+ b = random.choice(blocks)
+ print "GET /" + b
+ body = arv_cmd(ARV_GET_PATH, b)
+ print "got {} bytes".format(len(body))
+ time.sleep(SECONDS_BETWEEN_GETS)
+ nqueries = nqueries + 1
+
+def stop():
+ run_test_server.stop_keep()
+ run_test_server.stop()
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument('action',
+ type=str,
+ nargs='+',
+ help='''"start", "put", "get", "stop"''')
+ args = parser.parse_args()
+
+ if args.action[0] == 'start':
+ start()
+ elif args.action[0] == 'put':
+ put(args.action[1:])
+ elif args.action[0] == 'get':
+ get(args.action[1:])
+ elif args.action[0] == 'stop':
+ stop()
+ else:
+ print('Unrecognized action "{}"'.format(args.action))
+ print('actions are "start", "put", "get", "stop"')