3775: Update comment
[arvados.git] / sdk / cli / bin / crunch-job
index eff48a21eb0f50b5844c9657521a430fbb1c225b..d4c89495101bffdacda2605f6c77f3c59e687bb8 100755 (executable)
@@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output.
 Obtain job details from Arvados, run tasks on compute nodes (typically
 invoked by scheduler on controller):
 
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
 
 Obtain job details from command line, run tasks on local machine
 (typically invoked by application or developer on VM):
 
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
 
 =head1 OPTIONS
 
@@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway.
 
 =item --git-dir
 
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
 
 =item --job-api-token
 
@@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job
 setup. This can speed up development and debugging when running jobs
 locally.
 
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -140,7 +149,6 @@ $SIG{'USR2'} = sub
 
 
 my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
 
 my $User = $arv->{'users'}->{'current'}->execute;
 
@@ -153,30 +161,15 @@ if ($jobspec =~ /^[-a-z\d]+$/)
   # $jobspec is an Arvados UUID, not a JSON job specification
   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
   if (!$force_unlock) {
-    # If some other crunch-job process has grabbed this job (or we see
-    # other evidence that the job is already underway) we exit
-    # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
-    # mark the job as failed.
-    if ($Job->{'is_locked_by_uuid'}) {
-      Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'state'} ne 'Queued') {
-      Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'success'} ne undef) {
-      Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'running'}) {
-      Log(undef, "Job 'running' flag is already set");
+    # Claim this job, and make sure nobody else does
+    eval {
+      # lock() sets is_locked_by_uuid and changes state to Running.
+      $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
+    };
+    if ($@) {
+      Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
       exit EX_TEMPFAIL;
-    }
-    if ($Job->{'started_at'}) {
-      Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
-      exit EX_TEMPFAIL;
-    }
+    };
   }
 }
 else
@@ -191,13 +184,14 @@ else
 
   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
   $Job->{'started_at'} = gmtime;
+  $Job->{'state'} = 'Running';
 
   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
 }
 $job_id = $Job->{'uuid'};
 
 my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -279,20 +273,11 @@ foreach (@sinfo)
 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
 
 
-
-my $jobmanager_id;
-# Claim this job, and make sure nobody else does
-unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
-        $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-  Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
-  exit EX_TEMPFAIL;
-}
-$Job->update_attributes('state' => 'Running',
-                        'tasks_summary' => { 'failed' => 0,
-                                             'todo' => 1,
-                                             'running' => 0,
-                                             'done' => 0 });
-
+$Job->update_attributes(
+  'tasks_summary' => { 'failed' => 0,
+                       'todo' => 1,
+                       'running' => 0,
+                       'done' => 0 });
 
 Log (undef, "start");
 $SIG{'INT'} = sub { $main::please_freeze = 1; };
@@ -382,10 +367,11 @@ if (!defined $no_clear_tmp) {
 
 my $git_archive;
 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
-  # If we're in user-land (i.e., not called from crunch-dispatch)
-  # script_version can be an absolute directory path, signifying we
-  # should work straight out of that directory instead of using a git
-  # commit.
+  # If script_version looks like an absolute path, *and* the --git-dir
+  # argument was not given -- which implies we were not invoked by
+  # crunch-dispatch -- we will use the given path as a working
+  # directory instead of resolving script_version to a git commit (or
+  # doing anything else with git).
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
 }
@@ -413,13 +399,18 @@ else {
   # to a local dir before doing `git log` et al.
   my $repo_location;
 
-  if ($repo =~ m{://|\@.*:}) {
+  if ($repo =~ m{://|^[^/]*:}) {
     # $repo is a git url we can clone, like git:// or https:// or
-    # file:/// or git@host:repo.git
+    # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
+    # not recognized here because distinguishing that from a local
+    # path is too fragile. If you really need something strange here,
+    # use the ssh:// form.
     $repo_location = 'remote';
   } elsif ($repo =~ m{^\.*/}) {
     # $repo is a local path to a git index. We'll also resolve ../foo
-    # to ../foo/.git if the latter is a directory.
+    # to ../foo/.git if the latter is a directory. To help
+    # disambiguate local paths from named hosted repositories, this
+    # form must be given as ./ or ../ if it's a relative path.
     if (-d "$repo/.git") {
       $repo = "$repo/.git";
     }
@@ -429,16 +420,17 @@ else {
     # repository.
     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
       'filters' => [['name','=',$repo]]
-        )->{'items'};
-    my $n_found = scalar @{$arv_repo_list};
+        );
+    my @repos_found = @{$arv_repo_list->{'items'}};
+    my $n_found = $arv_repo_list->{'items_available'};
     if ($n_found > 0) {
       Log(undef, "Repository '$repo' -> "
-          . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
+          . join(", ", map { $_->{'uuid'} } @repos_found));
     }
     if ($n_found != 1) {
       croak("Error: Found $n_found repositories with name '$repo'.");
     }
-    $repo = $arv_repo_list->[0]->{'fetch_url'};
+    $repo = $repos_found[0]->{'fetch_url'};
     $repo_location = 'remote';
   }
   Log(undef, "Using $repo_location repository '$repo'");
@@ -471,11 +463,12 @@ else {
     # our local cache first, since that's cheaper. (We don't want to
     # do that with tags/branches though -- those change over time, so
     # they should always be resolved by the remote repo.)
-    if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
+    if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
       # Hide stderr because it's normal for this to fail:
       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
       if ($? == 0 &&
-          $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
+          # Careful not to resolve a branch named abcdeff to commit 1234567:
+          $sha1 =~ /^$treeish/ &&
           $sha1 =~ /^([0-9a-f]{40})$/s) {
         $commit = $1;
         Log(undef, "Commit $commit already present in $local_repo");
@@ -1352,6 +1345,73 @@ sub fhbits
 }
 
 
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+#     Starts an arv-put pipe, reading data on stdin and writing it to
+#     a $logfilename file in an output collection.
+#
+# log_writer_send($txt)
+#     Writes $txt to the output log collection.
+#
+# log_writer_finish()
+#     Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+#     Returns a true value if there is currently a live arv-put
+#     process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+
+sub log_writer_start($)
+{
+  my $logfilename = shift;
+  $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+                        'arv-put', '--portable-data-hash',
+                        '--retries', '3',
+                        '--filename', $logfilename,
+                        '-');
+}
+
+sub log_writer_send($)
+{
+  my $txt = shift;
+  print $log_pipe_in $txt;
+}
+
+sub log_writer_finish()
+{
+  return unless $log_pipe_pid;
+
+  close($log_pipe_in);
+  my $arv_put_output;
+
+  my $s = IO::Select->new($log_pipe_out);
+  if ($s->can_read(120)) {
+    sysread($log_pipe_out, $arv_put_output, 1024);
+    chomp($arv_put_output);
+  } else {
+    Log (undef, "timed out reading from 'arv-put'");
+  }
+
+  waitpid($log_pipe_pid, 0);
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
+  if ($?) {
+    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+  }
+
+  return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+  return $log_pipe_pid;
+}
+
 sub Log                                # ($jobstep_id, $logmessage)
 {
   if ($_[1] =~ /\n/) {
@@ -1365,15 +1425,15 @@ sub Log                         # ($jobstep_id, $logmessage)
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
-  if ($local_logfile || -t STDERR) {
+  if (log_writer_is_active() || -t STDERR) {
     my @gmtime = gmtime;
     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
                         $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  if ($local_logfile) {
-    print $local_logfile $datetime . " " . $message;
+  if (log_writer_is_active()) {
+    log_writer_send($datetime . " " . $message);
   }
 }
 
@@ -1385,14 +1445,15 @@ sub croak
   Log (undef, $message);
   freeze() if @jobstep_todo;
   collate_output() if @jobstep_todo;
-  cleanup() if $Job;
-  save_meta() if $local_logfile;
+  cleanup();
+  save_meta();
   die;
 }
 
 
 sub cleanup
 {
+  return unless $Job;
   if ($Job->{'state'} eq 'Cancelled') {
     $Job->update_attributes('finished_at' => scalar gmtime);
   } else {
@@ -1405,16 +1466,9 @@ sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+  return unless log_writer_is_active();
 
-  $local_logfile->flush;
-  my $retry_count = put_retry_count();
-  my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
-      "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
-  my $loglocator = `$cmd`;
-  die "system $cmd exited ".exit_status_s($?) if $?;
-  chomp($loglocator);
-
-  $local_logfile = undef;   # the temp file is automatically deleted
+  my $loglocator = log_writer_finish();
   Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->update_attributes('log', $loglocator);