Merge branch '3775-fetch-git-repo' closes #3775
[arvados.git] / sdk / cli / bin / crunch-job
index 04ff82f2a7866f89d0f1f5f5e32b0d9d6399cd35..369bc3e1ae6f021fbd809ecc7fa82c4da2a77ccc 100755 (executable)
@@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output.
 Obtain job details from Arvados, run tasks on compute nodes (typically
 invoked by scheduler on controller):
 
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
 
 Obtain job details from command line, run tasks on local machine
 (typically invoked by application or developer on VM):
 
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
 
 =head1 OPTIONS
 
@@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway.
 
 =item --git-dir
 
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
 
 =item --job-api-token
 
@@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job
 setup. This can speed up development and debugging when running jobs
 locally.
 
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -140,43 +149,31 @@ $SIG{'USR2'} = sub
 
 
 my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
-
-my $User = $arv->{'users'}->{'current'}->execute;
 
 my $Job;
 my $job_id;
 my $dbh;
 my $sth;
+my @jobstep;
+
+my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
+
 if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
-  $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+  $Job = retry_op(sub {
+    $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+  });
   if (!$force_unlock) {
-    # If some other crunch-job process has grabbed this job (or we see
-    # other evidence that the job is already underway) we exit
-    # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
-    # mark the job as failed.
-    if ($Job->{'is_locked_by_uuid'}) {
-      Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'state'} ne 'Queued') {
-      Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'success'} ne undef) {
-      Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'running'}) {
-      Log(undef, "Job 'running' flag is already set");
-      exit EX_TEMPFAIL;
-    }
-    if ($Job->{'started_at'}) {
-      Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
+    # Claim this job, and make sure nobody else does
+    eval { retry_op(sub {
+      # lock() sets is_locked_by_uuid and changes state to Running.
+      $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
+    }); };
+    if ($@) {
+      Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
       exit EX_TEMPFAIL;
-    }
+    };
   }
 }
 else
@@ -191,13 +188,14 @@ else
 
   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
   $Job->{'started_at'} = gmtime;
+  $Job->{'state'} = 'Running';
 
-  $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
+  $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
 }
 $job_id = $Job->{'uuid'};
 
 my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -279,20 +277,11 @@ foreach (@sinfo)
 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
 
 
-
-my $jobmanager_id;
-# Claim this job, and make sure nobody else does
-unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
-        $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
-  Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
-  exit EX_TEMPFAIL;
-}
-$Job->update_attributes('state' => 'Running',
-                        'tasks_summary' => { 'failed' => 0,
-                                             'todo' => 1,
-                                             'running' => 0,
-                                             'done' => 0 });
-
+$Job->update_attributes(
+  'tasks_summary' => { 'failed' => 0,
+                       'todo' => 1,
+                       'running' => 0,
+                       'done' => 0 });
 
 Log (undef, "start");
 $SIG{'INT'} = sub { $main::please_freeze = 1; };
@@ -314,7 +303,6 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id;
 $ENV{"JOB_UUID"} = $job_id;
 
 
-my @jobstep;
 my @jobstep_todo = ();
 my @jobstep_done = ();
 my @jobstep_tomerge = ();
@@ -332,12 +320,14 @@ if (defined $Job->{thawedfromkey})
 }
 else
 {
-  my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
-    'job_uuid' => $Job->{'uuid'},
-    'sequence' => 0,
-    'qsequence' => 0,
-    'parameters' => {},
-                                                          });
+  my $first_task = retry_op(sub {
+    $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+      'job_uuid' => $Job->{'uuid'},
+      'sequence' => 0,
+      'qsequence' => 0,
+      'parameters' => {},
+    });
+  });
   push @jobstep, { 'level' => 0,
                   'failures' => 0,
                    'arvados_task' => $first_task,
@@ -376,23 +366,27 @@ if (!defined $no_clear_tmp) {
     freeze_if_want_freeze ($cleanpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Cleanup command exited $?");
+  Log (undef, "Cleanup command exited ".exit_status_s($?));
 }
 
 
 my $git_archive;
 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
-  # If we're in user-land (i.e., not called from crunch-dispatch)
-  # script_version can be an absolute directory path, signifying we
-  # should work straight out of that directory instead of using a git
-  # commit.
+  # If script_version looks like an absolute path, *and* the --git-dir
+  # argument was not given -- which implies we were not invoked by
+  # crunch-dispatch -- we will use the given path as a working
+  # directory instead of resolving script_version to a git commit (or
+  # doing anything else with git).
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
 }
 else {
+  # Resolve the given script_version to a git commit sha1. Also, if
+  # the repository is remote, clone it into our local filesystem: this
+  # ensures "git archive" will work, and is necessary to reliably
+  # resolve a symbolic script_version like "master^".
   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
 
-  # Install requested code version
   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
@@ -413,33 +407,39 @@ else {
   # to a local dir before doing `git log` et al.
   my $repo_location;
 
-  if ($repo =~ m{://|\@.*:}) {
+  if ($repo =~ m{://|^[^/]*:}) {
     # $repo is a git url we can clone, like git:// or https:// or
-    # file:/// or git@host:repo.git
+    # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
+    # not recognized here because distinguishing that from a local
+    # path is too fragile. If you really need something strange here,
+    # use the ssh:// form.
     $repo_location = 'remote';
   } elsif ($repo =~ m{^\.*/}) {
     # $repo is a local path to a git index. We'll also resolve ../foo
-    # to ../foo/.git if the latter is a directory.
+    # to ../foo/.git if the latter is a directory. To help
+    # disambiguate local paths from named hosted repositories, this
+    # form must be given as ./ or ../ if it's a relative path.
     if (-d "$repo/.git") {
       $repo = "$repo/.git";
     }
     $repo_location = 'local';
-    Log(undef, "Using local repository '$repo'");
   } else {
     # $repo is none of the above. It must be the name of a hosted
     # repository.
-    my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
-      'filters' => [['name','=',$repo]]
-        )->{'items'};
-    my $n_found = scalar @{$arv_repo_list};
+    my $arv_repo_list = retry_op(sub {
+      $arv->{'repositories'}->{'list'}->execute(
+        'filters' => [['name','=',$repo]]);
+    });
+    my @repos_found = @{$arv_repo_list->{'items'}};
+    my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
     if ($n_found > 0) {
       Log(undef, "Repository '$repo' -> "
-          . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
+          . join(", ", map { $_->{'uuid'} } @repos_found));
     }
     if ($n_found != 1) {
       croak("Error: Found $n_found repositories with name '$repo'.");
     }
-    $repo = $arv_repo_list->[0]->{'fetch_url'};
+    $repo = $repos_found[0]->{'fetch_url'};
     $repo_location = 'remote';
   }
   Log(undef, "Using $repo_location repository '$repo'");
@@ -465,18 +465,20 @@ else {
     # if $local_repo is already initialized:
     `$gitcmd init --bare`;
     if ($?) {
-      croak("Error: $gitcmd init --bare exited $?");
+      croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
     }
 
     # If $treeish looks like a hash (or abbrev hash) we look it up in
     # our local cache first, since that's cheaper. (We don't want to
     # do that with tags/branches though -- those change over time, so
     # they should always be resolved by the remote repo.)
-    if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
-      my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
+    if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
+      # Hide stderr because it's normal for this to fail:
+      my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
       if ($? == 0 &&
-          $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
-          $sha1 =~ /^([0-9a-f]{40})$/) {
+          # Careful not to resolve a branch named abcdeff to commit 1234567:
+          $sha1 =~ /^$treeish/ &&
+          $sha1 =~ /^([0-9a-f]{40})$/s) {
         $commit = $1;
         Log(undef, "Commit $commit already present in $local_repo");
       }
@@ -495,7 +497,7 @@ else {
       Log(undef, "Fetching objects from $repo to $local_repo");
       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
       if ($?) {
-        croak("Error: `$gitcmd fetch` exited $?");
+        croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
       }
     }
 
@@ -507,7 +509,9 @@ else {
   my $gitcmd = "git --git-dir=\Q$repo\E";
   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
-    croak("`$gitcmd rev-list` exited $?, '$treeish' not found. Giving up.");
+    croak("`$gitcmd rev-list` exited "
+          .exit_status_s($?)
+          .", '$treeish' not found. Giving up.");
   }
   $commit = $1;
   Log(undef, "Version $treeish is commit $commit");
@@ -524,7 +528,7 @@ else {
   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
   $git_archive = `$gitcmd archive ''\Q$commit\E`;
   if ($?) {
-    croak("Error: $gitcmd archive exited $?");
+    croak("Error: $gitcmd archive exited ".exit_status_s($?));
   }
 }
 
@@ -557,7 +561,7 @@ else {
     freeze_if_want_freeze ($installpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Install script exited $?");
+  Log (undef, "Install script exited ".exit_status_s($?));
 }
 
 if (!$have_slurm)
@@ -596,7 +600,8 @@ fi
   }
   if ($? != 0)
   {
-    croak("Installing Docker image from $docker_locator returned exit code $?");
+    croak("Installing Docker image from $docker_locator exited "
+          .exit_status_s($?));
   }
 }
 
@@ -914,8 +919,9 @@ else {
     while (my $manifest_line = <$orig_manifest>) {
       $orig_manifest_text .= $manifest_line;
     }
-    my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
-      'manifest_text' => $orig_manifest_text,
+    my $output = retry_op(sub {
+      $arv->{'collections'}->{'create'}->execute(
+        'collection' => {'manifest_text' => $orig_manifest_text});
     });
     Log(undef, "output uuid " . $output->{uuid});
     Log(undef, "output hash " . $output->{portable_data_hash});
@@ -974,10 +980,7 @@ sub reapchildren
 
   my $childstatus = $?;
   my $exitvalue = $childstatus >> 8;
-  my $exitinfo = sprintf("exit %d signal %d%s",
-                         $exitvalue,
-                         $childstatus & 127,
-                         ($childstatus & 128 ? ' core dump' : ''));
+  my $exitinfo = "exit ".exit_status_s($childstatus);
   $Jobstep->{'arvados_task'}->reload;
   my $task_success = $Jobstep->{'arvados_task'}->{success};
 
@@ -1053,13 +1056,15 @@ sub reapchildren
     my $newtask_list = [];
     my $newtask_results;
     do {
-      $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
-        'where' => {
-          'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-        },
-        'order' => 'qsequence',
-        'offset' => scalar(@$newtask_list),
-      );
+      $newtask_results = retry_op(sub {
+        $arv->{'job_tasks'}->{'list'}->execute(
+          'where' => {
+            'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+          },
+          'order' => 'qsequence',
+          'offset' => scalar(@$newtask_list),
+        );
+      });
       push(@$newtask_list, @{$newtask_results->{items}});
     } while (@{$newtask_results->{items}});
     foreach my $arvados_task (@$newtask_list) {
@@ -1082,7 +1087,9 @@ sub check_refresh_wanted
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
   if (@stat && $stat[9] > $latest_refresh) {
     $latest_refresh = scalar time;
-    my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+    my $Job2 = retry_op(sub {
+      $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+    });
     for my $attr ('cancelled_at',
                   'cancelled_by_user_uuid',
                   'cancelled_by_client_uuid',
@@ -1263,7 +1270,7 @@ sub collate_output
 
   my ($child_out, $child_in);
   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
-                  '--retries', put_retry_count());
+                  '--retries', retry_count());
   my $joboutput;
   for (@jobstep)
   {
@@ -1352,6 +1359,73 @@ sub fhbits
 }
 
 
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+#     Starts an arv-put pipe, reading data on stdin and writing it to
+#     a $logfilename file in an output collection.
+#
+# log_writer_send($txt)
+#     Writes $txt to the output log collection.
+#
+# log_writer_finish()
+#     Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+#     Returns a true value if there is currently a live arv-put
+#     process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+
+sub log_writer_start($)
+{
+  my $logfilename = shift;
+  $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+                        'arv-put', '--portable-data-hash',
+                        '--retries', '3',
+                        '--filename', $logfilename,
+                        '-');
+}
+
+sub log_writer_send($)
+{
+  my $txt = shift;
+  print $log_pipe_in $txt;
+}
+
+sub log_writer_finish()
+{
+  return unless $log_pipe_pid;
+
+  close($log_pipe_in);
+  my $arv_put_output;
+
+  my $s = IO::Select->new($log_pipe_out);
+  if ($s->can_read(120)) {
+    sysread($log_pipe_out, $arv_put_output, 1024);
+    chomp($arv_put_output);
+  } else {
+    Log (undef, "timed out reading from 'arv-put'");
+  }
+
+  waitpid($log_pipe_pid, 0);
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
+  if ($?) {
+    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+  }
+
+  return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+  return $log_pipe_pid;
+}
+
 sub Log                                # ($jobstep_id, $logmessage)
 {
   if ($_[1] =~ /\n/) {
@@ -1365,15 +1439,15 @@ sub Log                         # ($jobstep_id, $logmessage)
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
-  if ($local_logfile || -t STDERR) {
+  if (log_writer_is_active() || -t STDERR) {
     my @gmtime = gmtime;
     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
                         $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  if ($local_logfile) {
-    print $local_logfile $datetime . " " . $message;
+  if (log_writer_is_active()) {
+    log_writer_send($datetime . " " . $message);
   }
 }
 
@@ -1385,14 +1459,15 @@ sub croak
   Log (undef, $message);
   freeze() if @jobstep_todo;
   collate_output() if @jobstep_todo;
-  cleanup() if $Job;
-  save_meta() if $local_logfile;
+  cleanup();
+  save_meta();
   die;
 }
 
 
 sub cleanup
 {
+  return unless $Job;
   if ($Job->{'state'} eq 'Cancelled') {
     $Job->update_attributes('finished_at' => scalar gmtime);
   } else {
@@ -1405,16 +1480,9 @@ sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+  return unless log_writer_is_active();
 
-  $local_logfile->flush;
-  my $retry_count = put_retry_count();
-  my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
-      "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
-  my $loglocator = `$cmd`;
-  die "system $cmd failed: $?" if $?;
-  chomp($loglocator);
-
-  $local_logfile = undef;   # the temp file is automatically deleted
+  my $loglocator = log_writer_finish();
   Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->update_attributes('log', $loglocator);
@@ -1534,7 +1602,10 @@ sub find_docker_image {
   # If not, return undef for both values.
   my $locator = shift;
   my ($streamname, $filename);
-  if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
+  my $image = retry_op(sub {
+    $arv->{collections}->{get}->execute(uuid => $locator);
+  });
+  if ($image) {
     foreach my $line (split(/\n/, $image->{manifest_text})) {
       my @tokens = split(/\s+/, $line);
       next if (!@tokens);
@@ -1555,20 +1626,66 @@ sub find_docker_image {
   }
 }
 
-sub put_retry_count {
-  # Calculate a --retries argument for arv-put that will have it try
-  # approximately as long as this Job has been running.
-  my $stoptime = shift || time;
-  my $starttime = $jobstep[0]->{starttime};
-  my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
-  my $retries = 0;
-  while ($timediff >= 2) {
-    $retries++;
-    $timediff /= 2;
+sub retry_count {
+  # Calculate the number of times an operation should be retried,
+  # assuming exponential backoff, and that we're willing to retry as
+  # long as tasks have been running.  Enforce a minimum of 3 retries.
+  my ($starttime, $endtime, $timediff, $retries);
+  if (@jobstep) {
+    $starttime = $jobstep[0]->{starttime};
+    $endtime = $jobstep[-1]->{finishtime};
+  }
+  if (!defined($starttime)) {
+    $timediff = 0;
+  } elsif (!defined($endtime)) {
+    $timediff = time - $starttime;
+  } else {
+    $timediff = ($endtime - $starttime) - (time - $endtime);
+  }
+  if ($timediff > 0) {
+    $retries = int(log($timediff) / log(2));
+  } else {
+    $retries = 1;  # Use the minimum.
   }
   return ($retries > 3) ? $retries : 3;
 }
 
+sub retry_op {
+  # Given a function reference, call it with the remaining arguments.  If
+  # it dies, retry it with exponential backoff until it succeeds, or until
+  # the current retry_count is exhausted.
+  my $operation = shift;
+  my $retries = retry_count();
+  foreach my $try_count (0..$retries) {
+    my $next_try = time + (2 ** $try_count);
+    my $result = eval { $operation->(@_); };
+    if (!$@) {
+      return $result;
+    } elsif ($try_count < $retries) {
+      my $sleep_time = $next_try - time;
+      sleep($sleep_time) if ($sleep_time > 0);
+    }
+  }
+  # Ensure the error message ends in a newline, so Perl doesn't add
+  # retry_op's line number to it.
+  chomp($@);
+  die($@ . "\n");
+}
+
+sub exit_status_s {
+  # Given a $?, return a human-readable exit code string like "0" or
+  # "1" or "0 with signal 1" or "1 with signal 11".
+  my $exitcode = shift;
+  my $s = $exitcode >> 8;
+  if ($exitcode & 0x7f) {
+    $s .= " with signal " . ($exitcode & 0x7f);
+  }
+  if ($exitcode & 0x80) {
+    $s .= " with core dump";
+  }
+  return $s;
+}
+
 __DATA__
 #!/usr/bin/perl