Merge branch 'master' into 3076-topnav-help
authorradhika <radhika@curoverse.com>
Fri, 27 Jun 2014 14:20:56 +0000 (10:20 -0400)
committerradhika <radhika@curoverse.com>
Fri, 27 Jun 2014 14:20:56 +0000 (10:20 -0400)
19 files changed:
apps/workbench/app/assets/javascripts/log_viewer.js
apps/workbench/app/controllers/actions_controller.rb
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/helpers/application_helper.rb
sdk/cli/bin/arv
sdk/cli/bin/arv-run-pipeline-instance
sdk/cli/bin/crunch-job
sdk/perl/lib/Arvados.pm
sdk/python/arvados/collection.py
sdk/python/arvados/stream.py
sdk/python/arvados/util.py
sdk/python/bin/arv-normalize
services/api/Gemfile
services/api/Gemfile.lock
services/api/script/cancel_stale_jobs.rb
services/api/script/crunch-dispatch.rb
services/crunch/crunchstat/src/arvados.org/crunchstat/crunchstat.go
services/fuse/arvados_fuse/__init__.py
services/fuse/bin/arv-mount

index 74bba133fbc87fd99f34eaff0c2b4155cfa9e90e..93681cc6d73fc7559120562f664e77c0a45d216e 100644 (file)
@@ -89,7 +89,7 @@ function addToLogViewer(logViewer, lines, taskState) {
                 taskid: v11,
                 node: node,
                 slot: slot,
-                message: message,
+                message: message.replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;'),
                 type: type
             });
             count += 1;
index c1e2617fc7c4d1de73219b646086701bc3c7e3fb..9a76e9aed412522ffa14166e0b9aadbe1f8e8acf 100644 (file)
@@ -70,11 +70,33 @@ class ActionsController < ApplicationController
     redirect_to @object
   end
 
+  def arv_normalize mt, *opts
+    r = ""
+    IO.popen(['arv-normalize'] + opts, 'w+b') do |io|
+      io.write mt
+      io.close_write
+      while buf = io.read(2**16)
+        r += buf
+      end
+    end
+    r
+  end
+
   expose_action :combine_selected_files_into_collection do
     lst = []
     files = []
     params["selection"].each do |s|
-      m = CollectionsHelper.match(s)
+      a = ArvadosBase::resource_class_for_uuid s
+      m = nil
+      if a == Link
+        begin
+          m = CollectionsHelper.match(Link.find(s).head_uuid)
+        rescue
+        end
+      else
+        m = CollectionsHelper.match(s)
+      end
+
       if m and m[1] and m[2]
         lst.append(m[1] + m[2])
         files.append(m)
@@ -93,32 +115,20 @@ class ActionsController < ApplicationController
     files.each do |m|
       mt = chash[m[1]+m[2]].manifest_text
       if m[4]
-        IO.popen(['arv-normalize', '--extract', m[4][1..-1]], 'w+b') do |io|
-          io.write mt
-          io.close_write
-          while buf = io.read(2**20)
-            combined += buf
-          end
-        end
+        combined += arv_normalize mt, '--extract', m[4][1..-1]
       else
         combined += chash[m[1]+m[2]].manifest_text
       end
     end
 
-    normalized = ''
-    IO.popen(['arv-normalize'], 'w+b') do |io|
-      io.write combined
-      io.close_write
-      while buf = io.read(2**20)
-        normalized += buf
-      end
-    end
+    normalized = arv_normalize combined
+    normalized_stripped = arv_normalize combined, '--strip'
 
     require 'digest/md5'
 
     d = Digest::MD5.new()
-    d << normalized
-    newuuid = "#{d.hexdigest}+#{normalized.length}"
+    d << normalized_stripped
+    newuuid = "#{d.hexdigest}+#{normalized_stripped.length}"
 
     env = Hash[ENV].
       merge({
@@ -132,10 +142,9 @@ class ActionsController < ApplicationController
             })
 
     IO.popen([env, 'arv-put', '--raw'], 'w+b') do |io|
-      io.write normalized
+      io.write normalized_stripped
       io.close_write
-      while buf = io.read(2**20)
-
+      while buf = io.read(2**16)
       end
     end
 
index d0496cb42a116e05adc99eadcb4adb883260ce17..e3f92efe237dd9668add6813bb5d228b79b65bb8 100644 (file)
@@ -728,7 +728,7 @@ class ApplicationController < ActionController::Base
   def get_n_objects_of_class dataclass, size
     @objects_map_for ||= {}
 
-    raise ArgumentError, 'Argument is not a data class' unless dataclass.is_a? Class
+    raise ArgumentError, 'Argument is not a data class' unless dataclass.is_a? Class and dataclass < ArvadosBase
     raise ArgumentError, 'Argument is not a valid limit size' unless (size && size>0)
 
     # if the objects_map_for has a value for this dataclass, and the
index 66267e028d4df4eb5dd1436fcdeb507904e466aa..80eb16a1cedeb71d211741a020ef69c8b1e20e89 100644 (file)
@@ -306,7 +306,7 @@ module ApplicationHelper
     selectables = []
 
     attrtext = attrvalue
-    if dataclass and dataclass.is_a? Class
+    if dataclass.is_a? Class and dataclass < ArvadosBase
       objects = get_n_objects_of_class dataclass, 10
       objects.each do |item|
         items << item
@@ -392,7 +392,7 @@ module ApplicationHelper
       render opts.merge(partial: "application/#{partial}")
     end
   end
-    
+
   def fa_icon_class_for_object object
     case object.class.to_s.to_sym
     when :User
index 684270304b84940e545286862eefbe8a33919d31..0d9051c8315cc6ca8300cd1f56dac31ab043d899 100755 (executable)
@@ -143,9 +143,14 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
   end
 end
 
+def arv_edit_save_tmp tmp
+  FileUtils::cp tmp.path, tmp.path + ".saved"
+  puts "Saved contents to " + tmp.path + ".saved"
+end
+
 def arv_edit client, arvados, global_opts, remaining_opts
-  n = remaining_opts.shift
-  if n.nil? or n == "-h" or n == "--help"
+  uuid = remaining_opts.shift
+  if uuid.nil? or uuid == "-h" or uuid == "--help"
     puts head_banner
     puts "Usage: arv edit [uuid] [fields...]\n\n"
     puts "Fetch the specified Arvados object, select the specified fields, \n"
@@ -162,9 +167,9 @@ def arv_edit client, arvados, global_opts, remaining_opts
 
   # determine controller
 
-  m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match n
+  m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid
   if !m
-    if /^[a-f0-9]{32}/.match n
+    if /^[a-f0-9]{32}/.match uuid
       abort "Arvados collections are not editable."
     else
       abort "#{n} does not appear to be an Arvados uuid"
@@ -187,7 +192,7 @@ def arv_edit client, arvados, global_opts, remaining_opts
   api_method = 'arvados.' + rsc + '.get'
 
   result = client.execute(:api_method => eval(api_method),
-                          :parameters => {"uuid" => n},
+                          :parameters => {"uuid" => uuid},
                           :authenticated => false,
                           :headers => {
                             authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
@@ -213,7 +218,7 @@ def arv_edit client, arvados, global_opts, remaining_opts
 
   require 'tempfile'
 
-  tmp = Tempfile.new([n, "." + global_opts[:format]])
+  tmp = Tempfile.new([uuid, "." + global_opts[:format]])
   tmp.write(content)
   tmp.close
 
@@ -251,9 +256,13 @@ def arv_edit client, arvados, global_opts, remaining_opts
           n += 1
         end
         puts "\nTry again (y/n)? "
-        yn = $stdin.read 1
+        yn = "X"
+        while not ["y", "Y", "n", "N"].include?(yn)
+          yn = $stdin.read 1
+        end
         if yn == 'n' or yn == 'N'
-          exit 1
+          arv_edit_save_tmp tmp
+          abort
         end
       end
     else
@@ -262,31 +271,45 @@ def arv_edit client, arvados, global_opts, remaining_opts
     end
   end
 
-  tmp.close(true)
-
-  if newobj != results
-    api_method = 'arvados.' + rsc + '.update'
-    dumped = Oj.dump(newobj)
-    result = client.execute(:api_method => eval(api_method),
-                            :parameters => {"uuid" => n, rsc.singularize => dumped},
-                            :authenticated => false,
-                            :headers => {
-                              authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
-                            })
-
-    begin
-      results = JSON.parse result.body
-    rescue JSON::ParserError => e
-      abort "Failed to parse server response:\n" + e.to_s
-    end
+  begin
+    if newobj != results
+      api_method = 'arvados.' + rsc + '.update'
+      dumped = Oj.dump(newobj)
 
-    if result.response.status != 200
-      puts "Update failed.  Server responded #{result.response.status}: #{results['errors']} "
-      puts "Update body was:"
-      puts dumped
+      begin
+        result = client.execute(:api_method => eval(api_method),
+                                :parameters => {"uuid" => uuid},
+                                :body => { rsc.singularize => dumped },
+                                :authenticated => false,
+                                :headers => {
+                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                                })
+      rescue Exception => e
+        puts "Error communicating with server, error was #{e}"
+        puts "Update body was:"
+        puts dumped
+        arv_edit_save_tmp tmp
+        abort
+      end
+
+      begin
+        results = JSON.parse result.body
+      rescue JSON::ParserError => e
+        abort "Failed to parse server response:\n" + e.to_s
+      end
+
+      if result.response.status != 200
+        puts "Update failed.  Server responded #{result.response.status}: #{results['errors']} "
+        puts "Update body was:"
+        puts dumped
+        arv_edit_save_tmp tmp
+        abort
+      end
+    else
+      puts "Object is unchanged, did not update."
     end
-  else
-    puts "Object is unchanged, did not update."
+  ensure
+    tmp.close(true)
   end
 
   exit 0
index 4810768ded0d5cd5a7051804dc38a8fa2f3d80c2..8e26600ff119dbaa55e0fdf78c06156540198138 100755 (executable)
@@ -469,6 +469,7 @@ class WhRunPipelineInstance
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
             :output_is_persistent => c[:output_is_persistent] || false,
+            :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
             # TODO: Delete the following three attributes when
             # supporting pre-20140418 API servers is no longer
@@ -591,7 +592,7 @@ class WhRunPipelineInstance
           ended += 1
           if c[:job][:success] == true
             succeeded += 1
-          elsif c[:job][:success] == false
+          elsif c[:job][:success] == false or c[:job][:cancelled_at]
             failed += 1
           end
         end
index 6224a64afe16f23e14d7c09f2aeeae7e24368c8e..219c315c4cfbf24a5d1c988202e58abb276a961f 100755 (executable)
@@ -639,6 +639,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
+        ."&& chmod og+wrx $ENV{TASK_WORK}"
        ."&& cd $ENV{CRUNCH_TMP} ";
     if ($build_script)
     {
@@ -657,22 +658,36 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .=
           q{$(ip -o address show scope global |
               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
-      foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
-      {
-        $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
-      }
+      $command .= "-v \Q$ENV{TASK_WORK}:/tmp/crunch-job:rw\E ";
+      $command .= "-v \Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
+      $command .= "-v \Q$ENV{TASK_KEEPMOUNT}:/mnt:ro\E ";
       while (my ($env_key, $env_val) = each %ENV)
       {
         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
-          $command .= "-e \Q$env_key=$env_val\E ";
+          if ($env_key eq "TASK_WORK") {
+            $command .= "-e \QTASK_WORK=/tmp/crunch-job\E ";
+          }
+          elsif ($env_key eq "TASK_KEEPMOUNT") {
+            $command .= "-e \QTASK_KEEPMOUNT=/mnt\E ";
+          }
+          elsif ($env_key eq "CRUNCH_SRC") {
+            $command .= "-e \QCRUNCH_SRC=/tmp/crunch-src\E ";
+          }
+          else {
+            $command .= "-e \Q$env_key=$env_val\E ";
+          }
         }
       }
       $command .= "\Q$docker_hash\E ";
+      $command .= "stdbuf -o0 -e0 ";
+      $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
     } else {
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
+      # Non-docker run
+      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
+      $command .= "stdbuf -o0 -e0 ";
+      $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
-    $command .= "stdbuf -o0 -e0 ";
-    $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
     my @execargs = ('bash', '-c', $command);
     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
     exit (111);
index d5eca9035e7d5497d07b565495f92fe087824ec0..c47f1da2d2a9476ca04ac5a5c1539090f50d2344 100644 (file)
@@ -6,14 +6,14 @@ Arvados -- client library for Arvados services
 
   use Arvados;
   $arv = Arvados->new(apiHost => 'arvados.local');
-  
+
   my $instances = $arv->{'pipeline_instances'}->{'list'}->execute();
   print "UUID is ", $instances->{'items'}->[0]->{'uuid'}, "\n";
-  
+
   $uuid = 'eiv0u-arx5y-2c5ovx43zw90gvh';
   $instance = $arv->{'pipeline_instances'}->{'get'}->execute('uuid' => $uuid);
   print "ETag is ", $instance->{'etag'}, "\n";
-  
+
   $instance->{'active'} = 1;
   $instance->{'name'} = '';
   $instance->save();
@@ -58,15 +58,20 @@ Default C<v1>
 =cut
 
 package Arvados;
+
+use Net::SSL (); # From Crypt-SSLeay
+BEGIN {
+  $Net::HTTPS::SSL_SOCKET_CLASS = "Net::SSL"; # Force use of Net::SSL
+}
+
 use JSON;
-use Data::Dumper;
-use IO::Socket::SSL;
 use Carp;
 use Arvados::ResourceAccessor;
 use Arvados::ResourceMethod;
 use Arvados::ResourceProxy;
 use Arvados::ResourceProxyList;
 use Arvados::Request;
+use Data::Dumper;
 
 $Arvados::VERSION = 0.1;
 
@@ -85,12 +90,15 @@ sub build
 
     $config = load_config_file("$ENV{HOME}/.config/arvados/settings.conf");
 
-    $self->{'authToken'} ||= 
+    $self->{'authToken'} ||=
        $ENV{ARVADOS_API_TOKEN} || $config->{ARVADOS_API_TOKEN};
 
     $self->{'apiHost'} ||=
        $ENV{ARVADOS_API_HOST} || $config->{ARVADOS_API_HOST};
 
+    $self->{'noVerifyHostname'} ||=
+       $ENV{ARVADOS_API_HOST_INSECURE};
+
     $self->{'apiProtocolScheme'} ||=
        $ENV{ARVADOS_API_PROTOCOL_SCHEME} ||
        $config->{ARVADOS_API_PROTOCOL_SCHEME};
@@ -127,7 +135,7 @@ sub new_request
 {
     my $self = shift;
     local $ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'};
-    if ($opts{'noVerifyHostname'} || ($host =~ /\.local$/)) {
+    if ($self->{'noVerifyHostname'} || ($host =~ /\.local$/)) {
         $ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'} = 0;
     }
     Arvados::Request->new();
index e7b26017d613ea8d0c792da9d228bb722ba9af44..87923489f803570b2b3730d72ad037dbef3433c0 100644 (file)
@@ -143,9 +143,13 @@ class CollectionReader(object):
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self):
+    def manifest_text(self, strip=False):
         self._populate()
-        return self._manifest_text
+        if strip:
+            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            return m
+        else:
+            return self._manifest_text
 
 class CollectionWriter(object):
     KEEP_BLOCK_SIZE = 2**26
@@ -359,7 +363,7 @@ class CollectionWriter(object):
                              for x in fields[1:-1] ]
                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
         return clean
-        
+
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
index dc90d8e57c19251010c9042cedb85ab46a4eb228..cbd1ba91dbf996bb6d99399606ca6d94ddbb98dc 100644 (file)
@@ -71,7 +71,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         block_size = data_locators[i][BLOCKSIZE]
         block_start = data_locators[i][OFFSET]
         block_end = block_start + block_size
-    
+
     while i < len(data_locators):
         locator, block_size, block_start = data_locators[i]
         block_end = block_start + block_size
@@ -169,7 +169,7 @@ class StreamFileReader(object):
             dc = bz2.BZ2Decompressor()
             return self.decompress(lambda segment: dc.decompress(segment), size)
         elif re.search('\.gz$', self._name):
-            dc = zlib.decompressobj(16+zlib.MAX_WBITS)            
+            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
         else:
             return self.readall(size)
@@ -209,7 +209,7 @@ class StreamReader(object):
             self._keep = keep
         else:
             self._keep = Keep.global_client_object()
-            
+
         streamoffset = 0L
 
         # parse stream
@@ -264,11 +264,16 @@ class StreamReader(object):
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
         return data
-    
-    def manifest_text(self):
+
+    def manifest_text(self, strip=False):
         manifest_text = [self.name().replace(' ', '\\040')]
-        manifest_text.extend([d[LOCATOR] for d in self._data_locators])
-        manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040')) 
+        if strip:
+            for d in self._data_locators:
+                m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
+                manifest_text.append(m.group(0))
+        else:
+            manifest_text.extend([d[LOCATOR] for d in self._data_locators])
+        manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
                                         for seg in f.segments])
                               for f in self._files.values()])
         return ' '.join(manifest_text) + '\n'
index e063f12de91c6ceb1d76302db6ea1ea201d57174..d5ea18ba49ad749a60aa9faba7141ddb6e3bbf63 100644 (file)
@@ -30,7 +30,10 @@ def run_command(execargs, **kwargs):
     kwargs.setdefault('close_fds', True)
     kwargs.setdefault('shell', False)
     p = subprocess.Popen(execargs, **kwargs)
-    stdoutdata, stderrdata = p.communicate(None)
+    if kwargs['stdout'] == subprocess.PIPE:
+        stdoutdata, stderrdata = p.communicate(None)
+    else:
+        p.wait()
     if p.returncode != 0:
         raise errors.CommandFailedError(
             "run_command %s exit %d:\n%s" %
@@ -162,7 +165,7 @@ def zipball_extract(zipball, path):
                     break
                 zip_file.write(buf)
             zip_file.close()
-            
+
             p = subprocess.Popen(["unzip",
                                   "-q", "-o",
                                   "-d", path,
index 755b56507289bbf1d5601ed3e9f238523a0dae1e..0506381272c856bbc4e1b933aef6f474752d3667 100755 (executable)
@@ -14,13 +14,14 @@ parser = argparse.ArgumentParser(
     description='Read manifest on standard input and put normalized manifest on standard output.')
 
 parser.add_argument('--extract', type=str, help="The file to extract from the input manifest")
+parser.add_argument('--strip', action='store_true', help="Strip authorization tokens")
 
 args = parser.parse_args()
 
 import arvados
 
 r = sys.stdin.read()
-    
+
 cr = arvados.CollectionReader(r)
 
 if args.extract:
@@ -36,4 +37,4 @@ if args.extract:
             if fn in s.files():
                 sys.stdout.write(s.files()[fn].as_manifest())
 else:
-    sys.stdout.write(cr.manifest_text())
+    sys.stdout.write(cr.manifest_text(args.strip))
index ce4162d86d3ad11139d179231e0f05ceba4c6bfd..905cce14db375dc93cef8b282c672ebf135739e4 100644 (file)
@@ -70,7 +70,7 @@ gem 'database_cleaner'
 
 gem 'themes_for_rails'
 
-gem 'arvados-cli', '>= 0.1.20140328152103'
+gem 'arvados-cli', '>= 0.1.20140627084759'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index 5f66bb2220f98db9ea3316dacd1a02351db8b2fa..a38f3ac7c3c4be54478f12a5c7c820cc03f53eca 100644 (file)
@@ -35,12 +35,12 @@ GEM
     addressable (2.3.6)
     andand (1.3.3)
     arel (3.0.3)
-    arvados (0.1.20140513131358)
+    arvados (0.1.20140627084759)
       activesupport (>= 3.2.13)
       andand
       google-api-client (~> 0.6.3)
       json (>= 1.7.7)
-    arvados-cli (0.1.20140513131358)
+    arvados-cli (0.1.20140627084759)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1.0)
@@ -215,7 +215,7 @@ PLATFORMS
 DEPENDENCIES
   acts_as_api
   andand
-  arvados-cli (>= 0.1.20140328152103)
+  arvados-cli (>= 0.1.20140627084759)
   coffee-rails (~> 3.2.0)
   database_cleaner
   faye-websocket
index dde4cbed0cf6b78eaf973a7f77f40d0b2333c071..4949ec08806605922a7ebc8fc5d3e6ad017e0648 100755 (executable)
@@ -1,5 +1,6 @@
 #!/usr/bin/env ruby
 
+
 if ENV["CRUNCH_DISPATCH_LOCKFILE"]
   lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE"
   lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644)
@@ -13,25 +14,31 @@ ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
 require File.dirname(__FILE__) + '/../config/boot'
 require File.dirname(__FILE__) + '/../config/environment'
 
-def cancel_stale_jobs
-  Job.running.each do |jobrecord|
-    f = Log.where("object_uuid=?", jobrecord.uuid).limit(1).order("created_at desc").first
-    if f
-      age = (Time.now - f.created_at)
-      if age > 300
-        $stderr.puts "dispatch: failing orphan job #{jobrecord.uuid}, last log is #{age} seconds old"
-        # job is marked running, but not known to crunch-dispatcher, and
-        # hasn't produced any log entries for 5 minutes, so mark it as failed.
-        jobrecord.running = false
-        jobrecord.cancelled_at ||= Time.now
-        jobrecord.finished_at ||= Time.now
-        if jobrecord.success.nil?
-          jobrecord.success = false
+class CancelJobs
+  include ApplicationHelper
+
+  def cancel_stale_jobs
+    act_as_system_user do
+      Job.running.each do |jobrecord|
+        f = Log.where("object_uuid=?", jobrecord.uuid).limit(1).order("created_at desc").first
+        if f
+          age = (Time.now - f.created_at)
+          if age > 300
+            $stderr.puts "dispatch: failing orphan job #{jobrecord.uuid}, last log is #{age} seconds old"
+            # job is marked running, but not known to crunch-dispatcher, and
+            # hasn't produced any log entries for 5 minutes, so mark it as failed.
+            jobrecord.running = false
+            jobrecord.cancelled_at ||= Time.now
+            jobrecord.finished_at ||= Time.now
+            if jobrecord.success.nil?
+              jobrecord.success = false
+            end
+            jobrecord.save!
+          end
         end
-        jobrecord.save!
       end
     end
   end
 end
 
-cancel_stale_jobs
+CancelJobs.new.cancel_stale_jobs
index 843fc0db66c2550917b10c42b81065ecaf226100..b1c0e7d316f2a245c82f727406b31f8490ed4e0b 100755 (executable)
@@ -166,6 +166,10 @@ class Dispatcher
       cmd_args = nil
       case Server::Application.config.crunch_job_wrapper
       when :none
+        if @running.size > 0
+            # Don't run more than one at a time.
+            return
+        end
         cmd_args = []
       when :slurm_immediate
         nodelist = nodes_available_for_job(job)
index f8d27ec01b79d7f25d2405fe2fb12085fce17ef2..d61871da6475dcdde342789069492e9e8dcfedf2 100644 (file)
@@ -182,14 +182,14 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                                        if op == "Read" {
                                                disk[device].last_read = disk[device].next_read
                                                disk[device].next_read = next
-                                               if disk[device].last_read > 0 {
+                                               if disk[device].last_read > 0 && (disk[device].next_read != disk[device].last_read) {
                                                        stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s read %v", device, disk[device].next_read-disk[device].last_read)
                                                }
                                        }
                                        if op == "Write" {
                                                disk[device].last_write = disk[device].next_write
                                                disk[device].next_write = next
-                                               if disk[device].last_write > 0 {
+                                               if disk[device].last_write > 0 && (disk[device].next_write != disk[device].last_write) {
                                                        stderr <- fmt.Sprintf("crunchstat: blkio.io_service_bytes %s write %v", device, disk[device].next_write-disk[device].last_write)
                                                }
                                        }
index 62af6c0c7cc31beaedaba9a066808395f5249838..2bbf137044023e5963a31063121df57483b4e9eb 100644 (file)
@@ -15,6 +15,7 @@ import arvados.events
 import re
 import apiclient
 import json
+import logging
 
 from time import time
 from llfuse import FUSEError
@@ -123,7 +124,7 @@ class Directory(FreshBase):
             try:
                 self.update()
             except apiclient.errors.HttpError as e:
-                print e
+                logging.debug(e)
 
     def __getitem__(self, item):
         self.checkupdate()
@@ -193,10 +194,11 @@ class CollectionDirectory(Directory):
                         cwd = cwd._entries[part]
                 for k, v in s.files().items():
                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
-            print "found"
             self.fresh()
+            return True
         except Exception as detail:
-            print("%s: error: %s" % (self.collection_locator,detail) )
+            logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
+            return False
 
 class MagicDirectory(Directory):
     '''A special directory that logically contains the set of all extant keep
@@ -216,19 +218,21 @@ class MagicDirectory(Directory):
         if k in self._entries:
             return True
         try:
-            if arvados.Keep.get(k):
+            e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
+            if e.update():
+                self._entries[k] = e
                 return True
             else:
                 return False
         except Exception as e:
-            #print 'exception keep', e
+            logging.debug('arv-mount exception keep %s', e)
             return False
 
     def __getitem__(self, item):
-        if item not in self._entries:
-            self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
-        return self._entries[item]
-
+        if item in self:
+            return self._entries[item]
+        else:
+            raise KeyError("No collection with id " + item)
 
 class TagsDirectory(Directory):
     '''A special directory that contains as subdirectories all tags visible to the user.'''
@@ -251,10 +255,11 @@ class TagsDirectory(Directory):
 
     def update(self):
         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
-        self.merge(tags['items'],
-                   lambda i: i['name'],
-                   lambda a, i: a.tag == i,
-                   lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+        if "items" in tags:
+            self.merge(tags['items'],
+                       lambda i: i['name'],
+                       lambda a, i: a.tag == i,
+                       lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 class TagDirectory(Directory):
     '''A special directory that contains as subdirectories all collections visible
@@ -411,9 +416,13 @@ class Operations(llfuse.Operations):
     so request handlers do not run concurrently unless the lock is explicitly released
     with llfuse.lock_released.'''
 
-    def __init__(self, uid, gid):
+    def __init__(self, uid, gid, debug=False):
         super(Operations, self).__init__()
 
+        if debug:
+            logging.basicConfig(level=logging.DEBUG)
+            logging.info("arv-mount debug enabled")
+
         self.inodes = Inodes()
         self.uid = uid
         self.gid = gid
@@ -470,7 +479,7 @@ class Operations(llfuse.Operations):
         return entry
 
     def lookup(self, parent_inode, name):
-        #print "lookup: parent_inode", parent_inode, "name", name
+        logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
         inode = None
 
         if name == '.':
@@ -506,7 +515,7 @@ class Operations(llfuse.Operations):
         return fh
 
     def read(self, fh, off, size):
-        #print "read", fh, off, size
+        logging.debug("arv-mount read %i %i %i", fh, off, size)
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
@@ -523,7 +532,7 @@ class Operations(llfuse.Operations):
             del self._filehandles[fh]
 
     def opendir(self, inode):
-        #print "opendir: inode", inode
+        logging.debug("arv-mount opendir: inode %i", inode)
 
         if inode in self.inodes:
             p = self.inodes[inode]
@@ -544,14 +553,14 @@ class Operations(llfuse.Operations):
         return fh
 
     def readdir(self, fh, off):
-        #print "readdir: fh", fh, "off", off
+        logging.debug("arv-mount readdir: fh %i off %i", fh, off)
 
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        #print "handle.entry", handle.entry
+        logging.debug("arv-mount handle.entry %s", handle.entry)
 
         e = off
         while e < len(handle.entry):
index b4afffab061fc2ceaf56bc7dcb93a105fa3d93cb..726741e3b01619b79b62094662d76e819e9b38df 100755 (executable)
@@ -1,11 +1,13 @@
 #!/usr/bin/env python
 
-from arvados_fuse import *
-import arvados
-import subprocess
 import argparse
+import arvados
 import daemon
+import os
 import signal
+import subprocess
+
+from arvados_fuse import *
 
 if __name__ == '__main__':
     # Handle command line parameters
@@ -36,7 +38,10 @@ collections on the server.""")
     args = parser.parse_args()
 
     # Create the request handler
-    operations = Operations(os.getuid(), os.getgid())
+    operations = Operations(os.getuid(), os.getgid(), args.debug)
+
+    if args.debug:
+        arvados.config.settings()['ARVADOS_DEBUG'] = 'true'
 
     if args.groups:
         api = arvados.api('v1')
@@ -91,12 +96,9 @@ collections on the server.""")
 
         exit(rc)
     else:
-        if args.foreground:
-            # Initialize the fuse connection
-            llfuse.init(operations, args.mountpoint, opts)
-            llfuse.main()
-        else:
-            # Initialize the fuse connection
-            llfuse.init(operations, args.mountpoint, opts)
-            with daemon.DaemonContext():
-                llfuse.main()
+        os.chdir(args.mountpoint)
+        if not args.foreground:
+            daemon_ctx = daemon.DaemonContext(working_directory='.')
+            daemon_ctx.open()
+        llfuse.init(operations, '.', opts)
+        llfuse.main()