Merge branch 'master' into 2955-fail-orphan-jobs
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 6 Jun 2014 14:31:18 +0000 (10:31 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 6 Jun 2014 14:31:18 +0000 (10:31 -0400)
39 files changed:
.gitignore
apps/workbench/app/assets/stylesheets/application.css.scss
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/controllers/jobs_controller.rb
apps/workbench/app/models/arvados_base.rb
apps/workbench/app/models/job.rb
apps/workbench/app/views/application/_svg_div.html.erb
apps/workbench/app/views/jobs/_show_status.html.erb [new file with mode: 0644]
apps/workbench/config/routes.rb
apps/workbench/test/integration/pipeline_instances_test.rb
doc/api/methods/users.html.textile.liquid
doc/user/topics/tutorial-gatk-variantfiltration.html.textile.liquid
doc/user/topics/tutorial-job1.html.textile.liquid
docker/api/secret_token.rb.in
docker/build_tools/Makefile
docker/config.yml.example
docker/jobs/Dockerfile
sdk/cli/bin/arv
sdk/cli/bin/crunch-job
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/arvados/v1/jobs_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/controllers/arvados/v1/users_controller.rb
services/api/app/models/job.rb
services/api/app/models/node.rb
services/api/config/routes.rb
services/api/db/migrate/20140530200539_add_supplied_script_version.rb [new file with mode: 0644]
services/api/db/schema.rb
services/api/script/crunch-dispatch.rb
services/api/test/fixtures/nodes.yml
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/unit/node_test.rb
services/crunch/crunchstat/go.sh [new file with mode: 0755]
services/crunch/crunchstat/src/arvados.org/crunchstat/crunchstat.go [new file with mode: 0644]
services/fuse/bin/arv-mount
services/keep/src/keep/keep.go
services/keep/src/keep/volume_unix.go
services/keep/tools/traffic_test.py [new file with mode: 0755]

index 602e1b9e40dbb8eda8a20ca1a607c40fa7e0b525..0cddee596c4cf665b9359f1a6276c7ecdcfd3d49 100644 (file)
@@ -16,3 +16,6 @@ services/keep/pkg
 services/keep/src/github.com
 sdk/java/target
 *.class
+apps/workbench/vendor/bundle
+services/api/vendor/bundle
+sdk/java/log
index 51c96d7fc87d1cf5be336b4bd4815460f471d48d..64c50985c85e885a2059d9c7bd3c41806be4e105 100644 (file)
@@ -191,3 +191,6 @@ table.table-fixed-header-row tbody {
     border-right: 1px solid #ffffff;
     background: #ffffff;
 }
+svg text {
+    font-size: 6pt;
+}
\ No newline at end of file
index e1b6a7f929450e8320941ee9b9fe030f791b47ae..7d49f55e33625c4eaa477762ba7652192b58e924 100644 (file)
@@ -145,9 +145,18 @@ class ApplicationController < ActionController::Base
     @new_resource_attrs ||= params[model_class.to_s.underscore.singularize]
     @new_resource_attrs ||= {}
     @new_resource_attrs.reject! { |k,v| k.to_s == 'uuid' }
-    @object ||= model_class.new @new_resource_attrs
-    @object.save!
-    show
+    @object ||= model_class.new @new_resource_attrs, params["options"]
+    if @object.save
+      respond_to do |f|
+        f.json { render json: @object.attributes.merge(href: url_for(@object)) }
+        f.html {
+          redirect_to @object
+        }
+        f.js { render }
+      end
+    else
+      self.render_error status: 422
+    end
   end
 
   def destroy
index 4746635c72a3ea141b64648a1efc675620be2657..841d3a9fdc6827486a7f54948e6c63ee4b3b5b1a 100644 (file)
@@ -16,7 +16,7 @@ class JobsController < ApplicationController
 
     @svg = ProvenanceHelper::create_provenance_graph nodes, "provenance_svg", {
       :request => request,
-      :all_script_parameters => true, 
+      :all_script_parameters => true,
       :script_version_nodes => true}
   end
 
@@ -31,6 +31,11 @@ class JobsController < ApplicationController
     end
   end
 
+  def cancel
+    @object.cancel
+    redirect_to @object
+  end
+
   def show
     generate_provenance([@object])
   end
@@ -44,6 +49,6 @@ class JobsController < ApplicationController
   end
 
   def show_pane_list
-    %w(Attributes Provenance Metadata JSON API)
+    %w(Status Attributes Provenance Metadata JSON API)
   end
 end
index 2eb0b625775a5b249c547eb221849ba7fe9459bc..33e107e3693c94b4954f4e155312b060ab50205e 100644 (file)
@@ -1,6 +1,7 @@
 class ArvadosBase < ActiveRecord::Base
   self.abstract_class = true
   attr_accessor :attribute_sortkey
+  attr_accessor :create_params
 
   def self.arvados_api_client
     ArvadosApiClient.new_or_current
@@ -29,8 +30,9 @@ class ArvadosBase < ActiveRecord::Base
       end
   end
 
-  def initialize raw_params={}
+  def initialize raw_params={}, create_params={}
     super self.class.permit_attribute_params(raw_params)
+    @create_params = create_params
     @attribute_sortkey ||= {
       'id' => nil,
       'name' => '000',
@@ -144,8 +146,10 @@ class ArvadosBase < ActiveRecord::Base
     ActionController::Parameters.new(raw_params).permit!
   end
 
-  def self.create raw_params={}
-    super(permit_attribute_params(raw_params))
+  def self.create raw_params={}, create_params={}
+    x = super(permit_attribute_params(raw_params))
+    x.create_params = create_params
+    x
   end
 
   def update_attributes raw_params={}
@@ -164,6 +168,7 @@ class ArvadosBase < ActiveRecord::Base
       obdata.delete :uuid
       resp = arvados_api_client.api(self.class, '/' + uuid, postdata)
     else
+      postdata.merge!(@create_params) if @create_params
       resp = arvados_api_client.api(self.class, '', postdata)
     end
     return false if !resp[:etag] || !resp[:uuid]
index 92f3910293695bb537959fbee02c2abd1c972e0e..173d3a06964fb5667b9546aee4bab518baf3c190 100644 (file)
@@ -10,4 +10,8 @@ class Job < ArvadosBase
   def self.creatable?
     false
   end
+
+  def cancel
+    arvados_api_client.api "jobs/#{self.uuid}/", "cancel", {}
+  end
 end
index ddbbf20d296e15b72309a27d7630d67cb4b9a8b8..20269621f3f6b9b166b7b11aa79001682302c570 100644 (file)
@@ -7,7 +7,7 @@
  border-width: 1px;
  border-color: gray;
  position: absolute;
- left: 1px;
+ left: 225px;
  right: 1px;
 }
 path:hover {
diff --git a/apps/workbench/app/views/jobs/_show_status.html.erb b/apps/workbench/app/views/jobs/_show_status.html.erb
new file mode 100644 (file)
index 0000000..a3f38d8
--- /dev/null
@@ -0,0 +1,93 @@
+
+<div class="pull-right">
+  <% if @object.running %>
+    <%= form_tag "/jobs/#{@object.uuid}/cancel", style: "display:inline; padding-left: 1em" do |f| %>
+      <%= button_tag "Cancel running job", {class: 'btn btn-danger', id: "cancel-job-button"} %>
+    <% end %>
+  <% else %>
+    Re-run job using script version:
+    <%= form_tag '/jobs', style: "display:inline; padding-left: 1em" do |f| %>
+      <% [:script, :script_version, :repository, :output_is_persistent, :supplied_script_version, :nondeterministic].each do |d| %>
+        <%= hidden_field :job, d, :value => @object[d] %>
+      <% end %>
+      <% [:script_parameters, :runtime_constraints].each do |d| %>
+        <%= hidden_field :job, d, :value => JSON.dump(@object[d]) %>
+      <% end %>
+      <%= button_tag "Same as this run", {class: 'btn btn-primary', id: "re-run-same-job-button"} %>
+    <% end %>
+  <% if !@object.supplied_script_version.nil? and !@object.supplied_script_version.empty? and @object.script_version != @object.supplied_script_version%>
+      <%= form_tag '/jobs', style: "display:inline" do |f| %>
+      <% [:script, :repository, :output_is_persistent, :supplied_script_version, :nondeterministic].each do |d| %>
+        <%= hidden_field :job, d, :value => @object[d] %>
+      <% end %>
+      <%= hidden_field :job, :script_version, :value => @object[:supplied_script_version] %>
+      <% [:script_parameters, :runtime_constraints].each do |d| %>
+        <%= hidden_field :job, d, :value => JSON.dump(@object[d]) %>
+      <% end %>
+      <%= button_tag "Latest (#{@object.repository}/#{@object.supplied_script_version})", {class: 'btn btn-primary', id: "re-run-latest-job-button"} %>
+    <% end %>
+  <% end %>
+<% end %>
+</div>
+
+<table class="table pipeline-components-table">
+  <colgroup>
+    <col style="width: 20%" />
+    <col style="width: 24%" />
+    <col style="width: 12%" />
+    <col style="width: 45%" />
+  </colgroup>
+  <thead>
+    <tr><th>
+        script, version
+      </th><th>
+        progress
+        <%# format:'js' here helps browsers avoid using the cached js
+            content in html context (e.g., duplicate tab -> see
+                                     javascript) %>
+        <%= link_to '(refresh)', {format: :js}, {class: 'refresh hide', remote: true, method: 'get'} %>
+      </th>
+      <th></th>
+      <th>
+        output
+      </th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <%= @object[:script] %><br>
+        <span class="deemphasize"><%= @object[:script_version] %></span>
+      </td>
+      <td>
+        <%= render partial: 'job_progress', locals: {:j => @object} %>
+        <% if @object.running == false %>
+          <% if @object[:job].andand[:uuid] %>
+            <span class="deemphasize">
+              <%= link_to("..."+@object[:job][:uuid].last(15), job_url(id: @object[:job][:uuid])) %>
+            </span>
+
+            <% current_job = @object %>
+            <% if current_job.andand[:log] %>
+              <% fixup = /([a-f0-9]{32}\+\d+)(\+?.*)/.match(current_job[:log])%>
+              <% Collection.limit(1).where(uuid: fixup[1]).each do |c| %>
+                <% c.files.each do |file| %>
+                  <br/><span class="deemphasize">
+                    <a href="<%= collection_path(current_job[:log]) %>/<%= file[1] %>?disposition=inline&size=<%= file[2] %>">log</a>
+                  </span>
+                <% end %>
+              <% end %>
+            <% end %>
+          <% end %>
+        <% end %>
+      </td><td>
+        <%= render(partial: 'job_status_label',
+                   locals: { :j => @object }) %>
+      </td><td>
+        <%= link_to_if_arvados_object @object[:output], {:thumbnail => true} %>
+      </td>
+    </tr>
+    <tfoot>
+      <tr><td colspan="5"></td></tr>
+    </tfoot>
+</table>
index b4da656bc3a42208764c9a7dfc01f1fd14f82f40..383d4421e2eb0e84e7cd1d14b94ea1e51e739e2a 100644 (file)
@@ -18,7 +18,9 @@ ArvadosWorkbench::Application.routes.draw do
   resources :virtual_machines
   resources :authorized_keys
   resources :job_tasks
-  resources :jobs
+  resources :jobs do
+    post 'cancel', :on => :member
+  end
   match '/logout' => 'sessions#destroy', via: [:get, :post]
   get '/logged_out' => 'sessions#index'
   resources :users do
index e3449f6e98a58d01ae857ed93caee810e84e37e5..b676dc74fc316da7f3c6cd2d15d64f09b56261e2 100644 (file)
@@ -52,5 +52,9 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
 
     # Pipeline is stopped. We have the option to resume it.
     page.assert_selector 'a,button', text: 'Run'
+
+    # Go over to the graph tab
+    click_link 'Graph'
+    assert page.has_css? 'div#provenance_graph'
   end
 end
index 59fa856b493d98d0c6c31fbc08f89b15d80a141b..33f884b6cc7ffd9599343503a34db89ee90ff6ad 100644 (file)
@@ -42,10 +42,6 @@ table(table table-bordered table-condensed).
 |_. Argument |_. Type |_. Description |_. Location |_. Example |
 {background:#ccffcc}.|uuid|string|The UUID of the User in question.|path||
 
-h2. event_stream
-
-event_stream users
-
 Arguments:
 
 table(table table-bordered table-condensed).
index 0248325f61034511acefe6733a9e3d5a8b5f3d8c..ea608b4bc1c15a278f919abe8f8677e943f39b8c 100644 (file)
@@ -154,48 +154,8 @@ Now start a job:
   "5ee633fe2569d2a42dd81b07490d5d13+82",
   "c905c8d8443a9c44274d98b7c6cfaa32+94",
   "d237a90bae3870b3b033aea1e99de4a9+10820"
- ],
- "log_stream_href":"https://qr1hi.arvadosapi.com/arvados/v1/jobs/qr1hi-8i9sb-n9k7qyp7bs5b9d4/log_tail_follow"
+ ]
 }
-~$ <span class="userinput">arv job log_tail_follow --uuid qr1hi-8i9sb-n9k7qyp7bs5b9d4</span>
-Tue Dec 17 19:02:16 2013 salloc: Granted job allocation 1251
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  check slurm allocation
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  node compute13 - 8 slots
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  start
-Tue Dec 17 19:02:17 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  Install revision 76588bfc57f33ea1b36b82ca7187f465b73b4ca4
-Tue Dec 17 19:02:18 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  Clean-work-dir exited 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  Install exited 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  script GATK2-VariantFiltration
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  script_version 76588bfc57f33ea1b36b82ca7187f465b73b4ca4
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  script_parameters {"input":"5ee633fe2569d2a42dd81b07490d5d13+82","gatk_bundle":"d237a90bae3870b3b033aea1e99de4a9+10820","gatk_binary_tarball":"c905c8d8443a9c44274d98b7c6cfaa32+94"}
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  runtime_constraints {"max_tasks_per_node":0}
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  start level 0
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 0 done, 0 running, 1 todo
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 job_task qr1hi-ot0gb-d3sjxerucfbvyev
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 child 4946 started on compute13.1
-Tue Dec 17 19:02:19 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 0 done, 1 running, 0 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 child 4946 on compute13.1 exit 0 signal 0 success=true
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 success in 1 seconds
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 0 output
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  wait for last 0 children to finish
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 1 done, 0 running, 1 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  start level 1
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 1 done, 0 running, 1 todo
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 job_task qr1hi-ot0gb-w8ujbnulxjaamxf
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 child 4984 started on compute13.1
-Tue Dec 17 19:02:20 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 1 done, 1 running, 0 todo
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 child 4984 on compute13.1 exit 0 signal 0 success=true
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 success in 110 seconds
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867 1 output bedd6ff56b3ae9f90d873b1fcb72f9a3+91
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  wait for last 0 children to finish
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  status: 2 done, 0 running, 0 todo
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  release job allocation
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  Freeze not implemented
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  collate
-Tue Dec 17 19:04:10 2013 salloc: Job allocation 1251 has been revoked.
-Tue Dec 17 19:04:10 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  output bedd6ff56b3ae9f90d873b1fcb72f9a3+91
-Tue Dec 17 19:04:11 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  finish
-Tue Dec 17 19:04:12 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  log manifest is 1e77aaceee2df499e14dc5dde5c3d328+91
 </code></pre>
 </notextile>
 
index f69381e4127510696c124019856942c8ac9b0c1f..cf09804871a624e383d0bccaeee4d39f43b66825 100644 (file)
@@ -76,8 +76,7 @@ Use @arv job create@ to actually submit the job.  It should print out a JSON obj
  "tasks_summary":{},
  "dependencies":[
   "c1bad4b39ca5a924e481008009d94e32+210"
- ],
- "log_stream_href":"https://qr1hi.arvadosapi.com/arvados/v1/jobs/qr1hi-8i9sb-1pm1t02dezhupss/log_tail_follow"
+ ]
 }
 </code></pre>
 </notextile>
@@ -91,12 +90,6 @@ h2. Monitor job progress
 
 Go to the "Workbench dashboard":https://{{site.arvados_workbench_host}} and visit *Activity* %(rarr)&rarr;% *Recent jobs*.  Your job should be near the top of the table.  This table refreshes automatically.  When the job has completed successfully, it will show <span class="label label-success">finished</span> in the *Status* column.
 
-On the command line, you can access log messages while the job runs using @arv job log_tail_follow@:
-
-notextile. <pre><code>~$ <span class="userinput">arv job log_tail_follow --uuid qr1hi-8i9sb-xxxxxxxxxxxxxxx</span></code></pre>
-
-This will print out the last several lines of the log for that job.
-
 h2. Inspect the job output
 
 On the "Workbench dashboard":https://{{site.arvados_workbench_host}}, look for the *Output* column of the *Recent jobs* table.  Click on the link under *Output* for your job to go to the files page with the job output.  The files page lists all the files that were output by the job.  Click on the link under the *file* column to view a file, or click on the download icon <span class="glyphicon glyphicon-download-alt"></span> to download the output file.
@@ -142,8 +135,7 @@ On the command line, you can use @arv job get@ to access a JSON object describin
  },
  "dependencies":[
   "c1bad4b39ca5a924e481008009d94e32+210"
- ],
- "log_stream_href":null
+ ]
 }
 </code></pre>
 </notextile>
index 201870bdd86932bf8e6720556085552cf9a658bd..30084803f08f7733ab8c127e25b1dfa77b68f946 100644 (file)
@@ -5,3 +5,8 @@
 # Make sure the secret is at least 30 characters and all random,
 # no regular words or you'll be exposed to dictionary attacks.
 Server::Application.config.secret_token = '@@API_SECRET@@'
+
+# The blob_signing_key is a string of alphanumeric characters used
+# to sign permission hints for Keep locators. It must be identical
+# to the permission key given to Keep.
+Server::Application.config.blob_signing_key = '@@KEEP_SIGNING_SECRET@@'
index 69db746326767729c5eacaa728db905400a7929a..298551fa1566b9b62ce3b588a8230e5c42cbc22b 100644 (file)
@@ -89,7 +89,8 @@ $(BUILD):
        rsync -rlp --exclude=docker/ --exclude='**/log/*' --exclude='**/tmp/*' \
                --chmod=Da+rx,Fa+rX ../ build/
        find build/ -name \*.gem -delete
-       cd build/sdk/python/ && ./build.sh
+       cd build/services/fuse/ && python setup.py build
+       cd build/sdk/python/ && python setup.py build
        cd build/sdk/cli && gem build arvados-cli.gemspec
        cd build/sdk/ruby && gem build arvados.gemspec
        touch build/.buildstamp
index da06fee255f046f701cc1b92585d106f45e16f8e..515fcbefd5121bbddb567c54aaa768b1008cc538 100644 (file)
@@ -45,6 +45,10 @@ ARVADOS_PROD_PW:
 # will be chosen randomly at build time. This is the
 # recommended setting.
 
+# The signing key shared by Keep at the API server to verify
+# blob permission signatures.
+KEEP_SIGNING_SECRET:
+
 # The value for the Rails config.secret_token setting.
 API_SECRET:
 
index 28ef858b8f3cd4bf9cfecf2d8fdc7d637f19927e..29c9d540b5f402828178586471dadd51a707a2cc 100644 (file)
@@ -14,6 +14,8 @@ RUN /usr/bin/apt-get install -q -y python-dev python-llfuse python-pip \
 # Install Arvados packages.
 RUN find /usr/src/arvados/sdk -name '*.gem' -print0 | \
       xargs -0rn 1 gem install && \
+    cd /usr/src/arvados/services/fuse && \
+    python setup.py install && \
     cd /usr/src/arvados/sdk/python && \
     python setup.py install
 
index d047204508fee386ee821bccbbd56fdde0a8dcfe..31cbeec70cb8b9d2b30116617a0e5586bc082591 100755 (executable)
@@ -313,8 +313,6 @@ end
 
 case api_method
 when
-  'arvados.users.event_stream',
-  'arvados.jobs.log_stream',
   'arvados.jobs.log_tail_follow'
 
   # Special case for methods that respond with data streams rather
index 0befdd5dbf9e6c8af770c187824ad90978a05c71..167d3ddbe9074e3edba2716d6247f03bd642aba7 100755 (executable)
@@ -641,7 +641,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_image)
     {
-      $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
+      $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
+      $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
       # Dynamically configure the container to use the host system as its
       # DNS server.  Get the host's global addresses from the ip command,
       # and turn them into docker --dns options using gawk.
@@ -654,9 +655,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
       while (my ($env_key, $env_val) = each %ENV)
       {
-        $command .= "-e \Q$env_key=$env_val\E ";
+        if ($env_key =~ /^(JOB|TASK)_/) {
+          $command .= "-e \Q$env_key=$env_val\E ";
+        }
       }
       $command .= "\Q$docker_image\E ";
+    } else {
+      $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
     }
     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     my @execargs = ('bash', '-c', $command);
index b0f85124a07b1010a940f19c9cbcc744fb8de485..105295db46c5df600b8e1eb303c237a0e59ae6e4 100644 (file)
@@ -60,7 +60,6 @@ gem 'omniauth', '1.1.1'
 gem 'omniauth-oauth2', '1.1.1'
 
 gem 'andand'
-gem 'redis'
 
 gem 'test_after_commit', :group => :test
 
index 4a4419f806cd3755b532e354404f2e67d200e570..c3b518062c76c6b2b0be09a120dcc9224c1c8ee9 100644 (file)
@@ -164,7 +164,6 @@ GEM
     rake (10.2.2)
     rdoc (3.12.2)
       json (~> 1.4)
-    redis (3.0.7)
     ref (1.0.5)
     rvm-capistrano (1.5.1)
       capistrano (~> 2.15.4)
@@ -228,7 +227,6 @@ DEPENDENCIES
   pg
   pg_power
   rails (~> 3.2.0)
-  redis
   rvm-capistrano
   sass-rails (>= 3.2.0)
   simplecov (~> 0.7.1)
index ee563010035bd92d7c7dcca9e305cfeb376ab34c..9043ac4b91d64b78d882885e2b2284b5c1a8c9cd 100644 (file)
@@ -117,51 +117,6 @@ class Arvados::V1::JobsController < ApplicationController
           @job.reload
         end
       end
-      @redis = Redis.new(:timeout => 0)
-      if @redis.exists @job.uuid
-        # A log buffer exists. Start by showing the last few KB.
-        @redis.
-          getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1).
-          sub(/^[^\n]*\n?/, '').
-          split("\n").
-          each do |line|
-          yield "#{line}\n"
-        end
-      end
-      # TODO: avoid missing log entries between getrange() above and
-      # subscribe() below.
-      @redis.subscribe(@job.uuid) do |event|
-        event.message do |channel, msg|
-          if msg == "end"
-            @redis.unsubscribe @job.uuid
-          else
-            yield "#{msg}\n"
-          end
-        end
-      end
-    end
-  end
-
-  def self._log_tail_follow_requires_parameters
-    {
-      buffer_size: {type: 'integer', required: false, default: 2**13}
-    }
-  end
-  def log_tail_follow
-    if !@object.andand.uuid
-      return render_not_found
-    end
-    if client_accepts_plain_text_stream
-      self.response.headers['Last-Modified'] = Time.now.ctime.to_s
-      self.response_body = LogStreamer.new @object, {
-        buffer_size: (params[:buffer_size].to_i rescue 2**13)
-      }
-    else
-      render json: {
-        href: url_for(uuid: @object.uuid),
-        comment: ('To retrieve the log stream as plain text, ' +
-                  'use a request header like "Accept: text/plain"')
-      }
     end
   end
 
index 3fbf5fcc6bda25a9a2aedf4b3a72cb149619e31c..5bfeff06f5c11200c08258eba53489b594aa1e3f 100644 (file)
@@ -20,9 +20,15 @@ class Arvados::V1::NodesController < ApplicationController
       if !@object
         return render_not_found
       end
-      @object.ping({ ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
-                     ping_secret: params[:ping_secret],
-                     ec2_instance_id: params[:instance_id] })
+      ping_data = {
+        ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+        ec2_instance_id: params[:instance_id]
+      }
+      [:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
+        .each do |key|
+        ping_data[key] = params[key] if params[key]
+      end
+      @object.ping(ping_data)
       if @object.info['ping_secret'] == params[:ping_secret]
         render json: @object.as_api_response(:superuser)
       else
index 7311deb3860c9337914ea10fe3122dfad62bdad4..a67180912345331ce8872eb7b6fce16119a160b6 100644 (file)
@@ -2,9 +2,9 @@ class Arvados::V1::UsersController < ApplicationController
   accept_attribute_as_json :prefs, Hash
 
   skip_before_filter :find_object_by_uuid, only:
-    [:activate, :event_stream, :current, :system, :setup]
+    [:activate, :current, :system, :setup]
   skip_before_filter :render_404_if_no_object, only:
-    [:activate, :event_stream, :current, :system, :setup]
+    [:activate, :current, :system, :setup]
   before_filter :admin_required, only: [:setup, :unsetup]
 
   def current
@@ -16,39 +16,6 @@ class Arvados::V1::UsersController < ApplicationController
     show
   end
 
-  class ChannelStreamer
-    Q_UPDATE_INTERVAL = 12
-    def initialize(opts={})
-      @opts = opts
-    end
-    def each
-      return unless @opts[:channel]
-      @redis = Redis.new(:timeout => 0)
-      @redis.subscribe(@opts[:channel]) do |event|
-        event.message do |channel, msg|
-          yield msg + "\n"
-        end
-      end
-    end
-  end
-
-  def event_stream
-    channel = current_user.andand.uuid
-    if current_user.andand.is_admin
-      channel = params[:uuid] || channel
-    end
-    if client_accepts_plain_text_stream
-      self.response.headers['Last-Modified'] = Time.now.ctime.to_s
-      self.response_body = ChannelStreamer.new(channel: channel)
-    else
-      render json: {
-        href: url_for(uuid: channel),
-        comment: ('To retrieve the event stream as plain text, ' +
-                  'use a request header like "Accept: text/plain"')
-      }
-    end
-  end
-
   def activate
     if current_user.andand.is_admin && params[:uuid]
       @object = User.find params[:uuid]
index fbc5640b3762fef94934ed271d8501b78c31ac14..51fb7c27832a21eec4193886e720d1c2c3363e23 100644 (file)
@@ -35,10 +35,9 @@ class Job < ArvadosModel
     t.add :runtime_constraints
     t.add :tasks_summary
     t.add :dependencies
-    t.add :log_stream_href
-    t.add :log_buffer
     t.add :nondeterministic
     t.add :repository
+    t.add :supplied_script_version
   end
 
   def assert_finished
@@ -47,12 +46,6 @@ class Job < ArvadosModel
                       running: false)
   end
 
-  def log_stream_href
-    unless self.finished_at
-      "#{current_api_base}/#{self.class.to_s.pluralize.underscore}/#{self.uuid}/log_tail_follow"
-    end
-  end
-
   def self.queue
     self.where('started_at is ? and is_locked_by_uuid is ? and cancelled_at is ? and success is ?',
                nil, nil, nil, nil).
@@ -88,6 +81,7 @@ class Job < ArvadosModel
     if new_record? or script_version_changed?
       sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil
       if sha1
+        self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty?
         self.script_version = sha1
       else
         raise ArgumentError.new("Specified script_version does not resolve to a commit")
@@ -187,15 +181,4 @@ class Job < ArvadosModel
       end
     end
   end
-
-  def log_buffer
-    begin
-      @@redis ||= Redis.new(:timeout => 0)
-      if @@redis.exists uuid
-        @@redis.getrange(uuid, 0 - 2**10, -1)
-      end
-    rescue Redis::CannotConnectError
-      return '(not available)'
-    end
-  end
 end
index 2ca05f62d59cc620d4e1b4fc8f48799c69c10266..71d4dea2c0cc815c7b29c30c8d0d7dac40c31cf1 100644 (file)
@@ -115,6 +115,15 @@ class Node < ArvadosModel
       end
     end
 
+    # Record other basic stats
+    ['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
+      if value = (o[key] or o[key.to_sym])
+        self.info[key] = value
+      else
+        self.info.delete(key)
+      end
+    end
+
     save!
   end
 
index 0223c04da4d4fc9783dce3eb61b0e8820a84f49d..e4d2975a571699d85d39887cd30bd24725639db2 100644 (file)
@@ -21,7 +21,6 @@ Server::Application.routes.draw do
       resources :job_tasks
       resources :jobs do
         get 'queue', on: :collection
-        get 'log_tail_follow', on: :member
         post 'cancel', on: :member
       end
       resources :keep_disks do
@@ -49,7 +48,6 @@ Server::Application.routes.draw do
       resources :users do
         get 'current', on: :collection
         get 'system', on: :collection
-        get 'event_stream', on: :member
         post 'activate', on: :member
         post 'setup', on: :collection
         post 'unsetup', on: :member
diff --git a/services/api/db/migrate/20140530200539_add_supplied_script_version.rb b/services/api/db/migrate/20140530200539_add_supplied_script_version.rb
new file mode 100644 (file)
index 0000000..c054235
--- /dev/null
@@ -0,0 +1,9 @@
+class AddSuppliedScriptVersion < ActiveRecord::Migration
+  def up
+    add_column :jobs, :supplied_script_version, :string
+  end
+
+  def down
+    remove_column :jobs, :supplied_script_version, :string
+  end
+end
index b026dff554e0064820665af8e23b35240465fa44..1ef80ab670389f3a7f87bc9e9581e44cd618a656 100644 (file)
@@ -11,9 +11,8 @@
 #
 # It's strongly recommended to check this file into your version control system.
 
-ActiveRecord::Schema.define(:version => 20140602143352) do
-
 
+ActiveRecord::Schema.define(:version => 20140602143352) do
 
   create_table "api_client_authorizations", :force => true do |t|
     t.string   "api_token",                                           :null => false
@@ -195,6 +194,7 @@ ActiveRecord::Schema.define(:version => 20140602143352) do
     t.boolean  "nondeterministic"
     t.string   "repository"
     t.boolean  "output_is_persistent",     :default => false, :null => false
+    t.string   "supplied_script_version"
   end
 
   add_index "jobs", ["created_at"], :name => "index_jobs_on_created_at"
index 87acb651a4347c2900689c296eb20b108f35e213..ee8076b305fad984aee6bdde12ef5cfe803e14d4 100755 (executable)
@@ -140,23 +140,13 @@ class Dispatcher
       end
 
       if Server::Application.config.crunch_job_user
-        cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user)
-      end
-
-      cmd_args << "HOME=/dev/null"
-      cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}"
-      cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE']
-
-      ENV.each do |k, v|
-        cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_"
-      end
-
-      if $trollopts.use_env
-        cmd_args << "PATH=#{ENV['PATH']}"
-        cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}"
-        cmd_args << "PERLLIB=#{ENV['PERLLIB']}"
-        cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}"
-        cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}"
+        cmd_args.unshift("sudo", "-E", "-u",
+                         Server::Application.config.crunch_job_user,
+                         "PATH=#{ENV['PATH']}",
+                         "PERLLIB=#{ENV['PERLLIB']}",
+                         "PYTHONPATH=#{ENV['PYTHONPATH']}",
+                         "RUBYLIB=#{ENV['RUBYLIB']}",
+                         "GEM_PATH=#{ENV['GEM_PATH']}")
       end
 
       job_auth = ApiClientAuthorization.
index 398bdf5cb06b9c6344acdab4bb107bfd3a7ecaf5..92e78da6c18e0f412a848b243b50b18b92e0555a 100644 (file)
@@ -32,3 +32,4 @@ idle:
   info:
     :ping_secret: "69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0"
     :slurm_state: "idle"
+    total_cpu_cores: 16
index e096a045c60c81b9aef6bf1fcc08d714a48077e9..06695aa6a762a9871deef9f820dec14c33eefedb 100644 (file)
@@ -75,4 +75,20 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     assert_not_nil json_response['info']['ping_secret']
   end
 
+  test "ping adds node stats to info" do
+    node = nodes(:idle)
+    post :ping, {
+      id: node.uuid,
+      ping_secret: node.info['ping_secret'],
+      total_cpu_cores: 32,
+      total_ram_mb: 1024,
+      total_scratch_mb: 2048
+    }
+    assert_response :success
+    info = JSON.parse(@response.body)['info']
+    assert_equal(node.info['ping_secret'], info['ping_secret'])
+    assert_equal(32, info['total_cpu_cores'].to_i)
+    assert_equal(1024, info['total_ram_mb'].to_i)
+    assert_equal(2048, info['total_scratch_mb'].to_i)
+  end
 end
index ccc3765dd94060d98bed4b59f5f55b111292e8e4..5a9a057696041ab76a505977bdd3659b6e2daa94 100644 (file)
@@ -1,7 +1,23 @@
 require 'test_helper'
 
 class NodeTest < ActiveSupport::TestCase
-  # test "the truth" do
-  #   assert true
-  # end
+  def ping_node(node_name, ping_data)
+    set_user_from_auth :admin
+    node = nodes(node_name)
+    node.ping({ping_secret: node.info['ping_secret'],
+                ip: node.ip_address}.merge(ping_data))
+    node
+  end
+
+  test "pinging a node can add and update stats" do
+    node = ping_node(:idle, {total_cpu_cores: '12', total_ram_mb: '512'})
+    assert_equal(12, node.info['total_cpu_cores'].to_i)
+    assert_equal(512, node.info['total_ram_mb'].to_i)
+  end
+
+  test "stats disappear if not in a ping" do
+    node = ping_node(:idle, {total_ram_mb: '256'})
+    refute_includes(node.info, 'total_cpu_cores')
+    assert_equal(256, node.info['total_ram_mb'].to_i)
+  end
 end
diff --git a/services/crunch/crunchstat/go.sh b/services/crunch/crunchstat/go.sh
new file mode 100755 (executable)
index 0000000..640a0d2
--- /dev/null
@@ -0,0 +1,15 @@
+#! /bin/sh
+
+# Wraps the 'go' executable with some environment setup.  Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
+GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
+export GOPATH
+
+mkdir -p $rootdir/pkg
+mkdir -p $rootdir/bin
+
+go $*
diff --git a/services/crunch/crunchstat/src/arvados.org/crunchstat/crunchstat.go b/services/crunch/crunchstat/src/arvados.org/crunchstat/crunchstat.go
new file mode 100644 (file)
index 0000000..7528485
--- /dev/null
@@ -0,0 +1,311 @@
+package main
+
+import (
+       "bufio"
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "os/exec"
+       "os/signal"
+       "strings"
+       "syscall"
+       "time"
+)
+
+func ReadLineByLine(inp io.ReadCloser, out chan string, finish chan bool) {
+       s := bufio.NewScanner(inp)
+       for s.Scan() {
+               out <- s.Text()
+       }
+       finish <- true
+}
+
+func OutputChannel(stdout chan string, stderr chan string) {
+       for {
+               select {
+               case s, ok := <-stdout:
+                       if ok {
+                               fmt.Fprintln(os.Stdout, s)
+                       } else {
+                               return
+                       }
+               case s, ok := <-stderr:
+                       if ok {
+                               fmt.Fprintln(os.Stderr, s)
+                       } else {
+                               return
+                       }
+               }
+       }
+}
+
+func PollCgroupStats(cgroup_path string, stderr chan string, poll int64) {
+       //var last_usage int64 = 0
+       var last_user int64 = 0
+       var last_sys int64 = 0
+       var last_cpucount int64 = 0
+
+       type Disk struct {
+               last_read  int64
+               next_read  int64
+               last_write int64
+               next_write int64
+       }
+
+       disk := make(map[string]*Disk)
+
+       //cpuacct_usage := fmt.Sprintf("%s/cpuacct.usage", cgroup_path)
+       cpuacct_stat := fmt.Sprintf("%s/cpuacct.stat", cgroup_path)
+       blkio_io_service_bytes := fmt.Sprintf("%s/blkio.io_service_bytes", cgroup_path)
+       cpuset_cpus := fmt.Sprintf("%s/cpuset.cpus", cgroup_path)
+       memory_stat := fmt.Sprintf("%s/memory.stat", cgroup_path)
+
+       var elapsed int64 = poll
+
+       for {
+               /*{
+                       c, _ := os.Open(cpuacct_usage)
+                       b, _ := ioutil.ReadAll(c)
+                       var next int64
+                       fmt.Sscanf(string(b), "%d", &next)
+                       if last_usage != 0 {
+                               stderr <- fmt.Sprintf("crunchstat: cpuacct.usage %v", (next-last_usage)/10000000)
+                       }
+                       //fmt.Printf("usage %d %d %d %d%%\n", last_usage, next, next-last_usage, (next-last_usage)/10000000)
+                       last_usage = next
+                       c.Close()
+               }*/
+               var cpus int64 = 0
+               {
+                       c, _ := os.Open(cpuset_cpus)
+                       b, _ := ioutil.ReadAll(c)
+                       sp := strings.Split(string(b), ",")
+                       for _, v := range sp {
+                               var min, max int64
+                               n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+                               if n == 2 {
+                                       cpus += (max - min) + 1
+                               } else {
+                                       cpus += 1
+                               }
+                       }
+
+                       if cpus != last_cpucount {
+                               stderr <- fmt.Sprintf("crunchstat: cpuset.cpus %v", cpus)
+                       }
+                       last_cpucount = cpus
+
+                       c.Close()
+               }
+               if cpus == 0 {
+                       cpus = 1
+               }
+               {
+                       c, _ := os.Open(cpuacct_stat)
+                       b, _ := ioutil.ReadAll(c)
+                       var next_user int64
+                       var next_sys int64
+                       fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
+                       c.Close()
+
+                       if last_user != 0 {
+                               user_diff := next_user - last_user
+                               sys_diff := next_sys - last_sys
+                               // Assume we're reading stats based on 100
+                               // jiffies per second.  Because the ellaspsed
+                               // time is in milliseconds, we need to boost
+                               // that to 1000 jiffies per second, then boost
+                               // it by another 100x to get a percentage, then
+                               // finally divide by the actual elapsed time
+                               // and the number of cpus to get average load
+                               // over the polling period.
+                               user_pct := (user_diff * 10 * 100) / (elapsed * cpus)
+                               sys_pct := (sys_diff * 10 * 100) / (elapsed * cpus)
+
+                               stderr <- fmt.Sprintf("crunchstat: cpuacct.stat user %v", user_pct)
+                               stderr <- fmt.Sprintf("crunchstat: cpuacct.stat sys %v", sys_pct)
+                       }
+
+                       /*fmt.Printf("user %d %d %d%%\n", last_user, next_user, next_user-last_user)
+                       fmt.Printf("sys %d %d %d%%\n", last_sys, next_sys, next_sys-last_sys)
+                       fmt.Printf("sum %d%%\n", (next_user-last_user)+(next_sys-last_sys))*/
+                       last_user = next_user
+                       last_sys = next_sys
+               }
+               {
+                       c, _ := os.Open(blkio_io_service_bytes)
+                       b := bufio.NewScanner(c)
+                       var device, op string
+                       var next int64
+                       for b.Scan() {
+                               if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &next); err == nil {
+                                       if disk[device] == nil {
+                                               disk[device] = new(Disk)
+                                       }
+                                       if op == "Read" {
+                                               disk[device].last_read = disk[device].next_read
+                                               disk[device].next_read = next
+                                               if disk[device].last_read > 0 {
+                                                       stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
+                                               }
+                                       }
+                                       if op == "Write" {
+                                               disk[device].last_write = disk[device].next_write
+                                               disk[device].next_write = next
+                                               if disk[device].last_write > 0 {
+                                                       stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
+                                               }
+                                       }
+                               }
+                       }
+                       c.Close()
+               }
+
+               {
+                       c, _ := os.Open(memory_stat)
+                       b := bufio.NewScanner(c)
+                       var stat string
+                       var val int64
+                       for b.Scan() {
+                               if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err == nil {
+                                       if stat == "rss" {
+                                               stderr <- fmt.Sprintf("crunchstat: memory.stat rss %v", val)
+                                       }
+                               }
+                       }
+                       c.Close()
+               }
+
+               bedtime := time.Now()
+               time.Sleep(time.Duration(poll) * time.Millisecond)
+               morning := time.Now()
+               elapsed = morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
+       }
+}
+
+func main() {
+
+       var (
+               cgroup_path    string
+               cgroup_parent  string
+               cgroup_cidfile string
+               wait           int64
+               poll           int64
+       )
+
+       flag.StringVar(&cgroup_path, "cgroup-path", "", "Direct path to cgroup")
+       flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Path to parent cgroup")
+       flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
+       flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
+       flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
+
+       flag.Parse()
+
+       logger := log.New(os.Stderr, "crunchstat: ", 0)
+
+       if cgroup_path == "" && cgroup_cidfile == "" {
+               logger.Fatal("Must provide either -cgroup-path or -cgroup-cid")
+       }
+
+       // Make output channel
+       stdout_chan := make(chan string)
+       stderr_chan := make(chan string)
+       finish_chan := make(chan bool)
+       defer close(stdout_chan)
+       defer close(stderr_chan)
+       defer close(finish_chan)
+
+       go OutputChannel(stdout_chan, stderr_chan)
+
+       var cmd *exec.Cmd
+
+       if len(flag.Args()) > 0 {
+               // Set up subprocess
+               cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
+
+               logger.Print("Running ", flag.Args())
+
+               // Forward SIGINT and SIGTERM to inner process
+               term := make(chan os.Signal, 1)
+               go func(sig <-chan os.Signal) {
+                       catch := <-sig
+                       if cmd.Process != nil {
+                               cmd.Process.Signal(catch)
+                       }
+                       logger.Print("caught signal:", catch)
+               }(term)
+               signal.Notify(term, syscall.SIGTERM)
+               signal.Notify(term, syscall.SIGINT)
+
+               // Funnel stdout and stderr from subprocess to output channels
+               stdout_pipe, err := cmd.StdoutPipe()
+               if err != nil {
+                       logger.Fatal(err)
+               }
+               go ReadLineByLine(stdout_pipe, stdout_chan, finish_chan)
+
+               stderr_pipe, err := cmd.StderrPipe()
+               if err != nil {
+                       logger.Fatal(err)
+               }
+               go ReadLineByLine(stderr_pipe, stderr_chan, finish_chan)
+
+               // Run subprocess
+               if err := cmd.Start(); err != nil {
+                       logger.Fatal(err)
+               }
+       }
+
+       // Read the cid file
+       if cgroup_cidfile != "" {
+               // wait up to 'wait' seconds for the cid file to appear
+               var i time.Duration
+               for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
+                       f, err := os.Open(cgroup_cidfile)
+                       if err == nil {
+                               cid, err2 := ioutil.ReadAll(f)
+                               if err2 == nil && len(cid) > 0 {
+                                       cgroup_path = string(cid)
+                                       f.Close()
+                                       break
+                               }
+                       }
+                       time.Sleep(100 * time.Millisecond)
+               }
+               if cgroup_path == "" {
+                       logger.Printf("Could not read cid file %s", cgroup_cidfile)
+               }
+       }
+
+       // add the parent prefix
+       if cgroup_parent != "" {
+               cgroup_path = fmt.Sprintf("%s/%s", cgroup_parent, cgroup_path)
+       }
+
+       logger.Print("Using cgroup ", cgroup_path)
+
+       go PollCgroupStats(cgroup_path, stderr_chan, poll)
+
+       // Wait for each of stdout and stderr to drain
+       <-finish_chan
+       <-finish_chan
+
+       if err := cmd.Wait(); err != nil {
+               if exiterr, ok := err.(*exec.ExitError); ok {
+                       // The program has exited with an exit code != 0
+
+                       // This works on both Unix and Windows. Although package
+                       // syscall is generally platform dependent, WaitStatus is
+                       // defined for both Unix and Windows and in both cases has
+                       // an ExitStatus() method with the same signature.
+                       if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
+                               os.Exit(status.ExitStatus())
+                       }
+               } else {
+                       logger.Fatalf("cmd.Wait: %v", err)
+               }
+       }
+}
index 904fbf1b8d8958804575a0495b35616acc6f79e0..b4afffab061fc2ceaf56bc7dcb93a105fa3d93cb 100755 (executable)
@@ -5,6 +5,7 @@ import arvados
 import subprocess
 import argparse
 import daemon
+import signal
 
 if __name__ == '__main__':
     # Handle command line parameters
@@ -66,7 +67,20 @@ collections on the server.""")
 
         rc = 255
         try:
-            rc = subprocess.call(args.exec_args, shell=False)
+            sp = subprocess.Popen(args.exec_args, shell=False)
+
+            # forward signals to the process.
+            signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+            signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+            signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+            # wait for process to complete.
+            rc = sp.wait()
+
+            # restore default signal handlers.
+            signal.signal(signal.SIGINT, signal.SIG_DFL)
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+            signal.signal(signal.SIGQUIT, signal.SIG_DFL)
         except OSError as e:
             sys.stderr.write('arv-mount: %s -- exec %s\n' % (str(e), args.exec_args))
             rc = e.errno
index fa7340d26f5f14c2e6d6242e9ea747a58dac7421..429a7e01b29b517e8757427412654e0d18dada67 100644 (file)
@@ -5,7 +5,6 @@ import (
        "bytes"
        "crypto/md5"
        "encoding/json"
-       "errors"
        "flag"
        "fmt"
        "github.com/gorilla/mux"
@@ -87,10 +86,6 @@ func (e *KeepError) Error() string {
        return e.ErrMsg
 }
 
-// This error is returned by ReadAtMost if the available
-// data exceeds BLOCKSIZE bytes.
-var ReadErrorTooLong = errors.New("Too long")
-
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
 // of command line flags (identifying Keep volumes and initializing
@@ -444,14 +439,18 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // Read the block data to be stored.
        // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
        //
-       // Note: because req.Body is a buffered Reader, each Read() call will
-       // collect only the data in the network buffer (typically 16384 bytes),
-       // even if it is passed a much larger slice.
-       //
-       // Instead, call ReadAtMost to read data from the socket
-       // repeatedly until either EOF or BLOCKSIZE bytes have been read.
-       //
-       if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
+       if req.ContentLength > BLOCKSIZE {
+               http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
+               return
+       }
+
+       buf := make([]byte, req.ContentLength)
+       nread, err := io.ReadFull(req.Body, buf)
+       if err != nil {
+               http.Error(resp, err.Error(), 500)
+       } else if int64(nread) < req.ContentLength {
+               http.Error(resp, "request truncated", 500)
+       } else {
                if err := PutBlock(buf, hash); err == nil {
                        // Success; add a size hint, sign the locator if
                        // possible, and return it to the client.
@@ -466,16 +465,8 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                        ke := err.(*KeepError)
                        http.Error(resp, ke.Error(), ke.HTTPCode)
                }
-       } else {
-               log.Println("error reading request: ", err)
-               errmsg := err.Error()
-               if err == ReadErrorTooLong {
-                       // Use a more descriptive error message that includes
-                       // the maximum request size.
-                       errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
-               }
-               http.Error(resp, errmsg, 500)
        }
+       return
 }
 
 // IndexHandler
@@ -691,24 +682,6 @@ func PutBlock(block []byte, hash string) error {
        }
 }
 
-// ReadAtMost
-//     Reads bytes repeatedly from an io.Reader until either
-//     encountering EOF, or the maxbytes byte limit has been reached.
-//     Returns a byte slice of the bytes that were read.
-//
-//     If the reader contains more than maxbytes, returns a nil slice
-//     and an error.
-//
-func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
-       // Attempt to read one more byte than maxbytes.
-       lr := io.LimitReader(r, int64(maxbytes+1))
-       buf, err := ioutil.ReadAll(lr)
-       if len(buf) > maxbytes {
-               return nil, ReadErrorTooLong
-       }
-       return buf, err
-}
-
 // IsValidLocator
 //     Return true if the specified string is a valid Keep locator.
 //     When Keep is extended to support hash types other than MD5,
index 88410def8e6d55b5af68226a134708362e8d973f..7b711d2eac1e7c6f5024cc49f723dc31c6c86952 100644 (file)
@@ -109,23 +109,13 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 // corrupted data block.
 //
 func (v *UnixVolume) Read(loc string) ([]byte, error) {
-       var f *os.File
-       var err error
-       var buf []byte
-
        blockFilename := filepath.Join(v.root, loc[0:3], loc)
-
-       f, err = os.Open(blockFilename)
+       buf, err := ioutil.ReadFile(blockFilename)
        if err != nil {
-               return nil, err
-       }
-
-       if buf, err = ioutil.ReadAll(f); err != nil {
                log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
-               return buf, err
+               return nil, err
        }
 
-       // Success!
        return buf, nil
 }
 
diff --git a/services/keep/tools/traffic_test.py b/services/keep/tools/traffic_test.py
new file mode 100755 (executable)
index 0000000..de97edd
--- /dev/null
@@ -0,0 +1,126 @@
+#! /usr/bin/env python
+
+# traffic_test.py
+#
+# Launch a test Keep and API server and PUT and GET a bunch of blocks.
+# Can be used to simulate client traffic in Keep to evaluate memory usage,
+# error logging, performance, etc.
+#
+# This script is warty and is relatively environment-specific, but the
+# example run described below should execute cleanly.
+#
+# Usage:
+#   traffic_test.py start
+#       Starts the test servers.
+#   traffic_test.py put file1 file2 file3 ....
+#       Runs arv-put on each file.
+#   traffic_test.py get hash1 hash2 hash3 ....
+#       Loops forever issuing GET requests for specified blocks.
+#   traffic_test.py stop
+#       Stops the test servers.
+#
+# Example:
+#
+#   $ ./traffic_test.py start
+#   $ ./traffic_test.py put GS00253-DNA_A02_200_37.tsv.bz2 \
+#         GS00253-DNA_B01_200_37.tsv.bz2 \
+#         GS00253-DNA_B02_200_37.tsv.bz2
+#   $ ./traffic_test.py get $(find /tmp/tmp* -type f -printf "%f ")
+#     [loops forever]
+#     ^C
+#   $ ./traffic_test.py stop
+#
+# Multiple "get" runs may be run concurrently to evaluate Keep's handling
+# of additional concurrent clients.
+
+PYSDK_DIR    = "../../../sdk/python"
+PYTEST_DIR   = PYSDK_DIR + "/tests"
+ARV_PUT_PATH = PYSDK_DIR + "/bin/arv-put"
+ARV_GET_PATH = PYSDK_DIR + "/bin/arv-get"
+SECONDS_BETWEEN_GETS = 1
+
+import argparse
+import httplib2
+import os
+import random
+import subprocess
+import sys
+import time
+
+# for run_test_server.py
+sys.path.insert(0, PYSDK_DIR)
+sys.path.insert(0, PYTEST_DIR)
+import arvados
+import run_test_server
+
+def arv_cmd(*args):
+    p = subprocess.Popen([sys.executable] + list(args),
+                         stdout=subprocess.PIPE)
+    (arvout, arverr) = p.communicate()
+    if p.returncode != 0:
+        print "error {} from {} {}: {}".format(
+            p.returncode, sys.executable, args, arverr)
+        sys.exit(p.returncode)
+    return arvout
+
+def start():
+    run_test_server.run()
+    run_test_server.run_keep()
+
+def put(files):
+    os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+    run_test_server.authorize_with('active')
+    for v in ["ARVADOS_API_HOST",
+              "ARVADOS_API_HOST_INSECURE",
+              "ARVADOS_API_TOKEN"]:
+        os.environ[v] = arvados.config.settings()[v]
+
+    if not os.environ.has_key('PYTHONPATH'):
+        os.environ['PYTHONPATH'] = ''
+    os.environ['PYTHONPATH'] = "{}:{}:{}".format(
+        PYSDK_DIR, PYTEST_DIR, os.environ['PYTHONPATH'])
+
+    for c in files:
+        manifest_uuid = arv_cmd(ARV_PUT_PATH, c)
+
+def get(blocks):
+    os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+
+    run_test_server.authorize_with('active')
+    for v in ["ARVADOS_API_HOST",
+              "ARVADOS_API_HOST_INSECURE",
+              "ARVADOS_API_TOKEN"]:
+        os.environ[v] = arvados.config.settings()[v]
+
+    nqueries = 0
+    while True:
+        b = random.choice(blocks)
+        print "GET /" + b
+        body = arv_cmd(ARV_GET_PATH, b)
+        print "got {} bytes".format(len(body))
+        time.sleep(SECONDS_BETWEEN_GETS)
+        nqueries = nqueries + 1
+
+def stop():
+    run_test_server.stop_keep()
+    run_test_server.stop()
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument('action',
+                        type=str,
+                        nargs='+',
+                        help='''"start", "put", "get", "stop"''')
+    args = parser.parse_args()
+
+    if args.action[0] == 'start':
+        start()
+    elif args.action[0] == 'put':
+        put(args.action[1:])
+    elif args.action[0] == 'get':
+        get(args.action[1:])
+    elif args.action[0] == 'stop':
+        stop()
+    else:
+        print('Unrecognized action "{}"'.format(args.action))
+        print('actions are "start", "put", "get", "stop"')