closes #4024
[arvados.git] / sdk / cli / bin / crunch-job
index 617d22f4d1d269d8d9b4fde7caae9fdde2d1e51a..0d35d53f9d2b924ea8b583fda5b5a3a682be09fb 100755 (executable)
@@ -157,20 +157,15 @@ my $dbh;
 my $sth;
 my @jobstep;
 
-my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
+my $User = api_call("users/current");
 
 if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
-  $Job = retry_op(sub {
-    $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
-  });
+  $Job = api_call("jobs/get", uuid => $jobspec);
   if (!$force_unlock) {
     # Claim this job, and make sure nobody else does
-    eval { retry_op(sub {
-      # lock() sets is_locked_by_uuid and changes state to Running.
-      $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
-    }); };
+    eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
     if ($@) {
       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
       exit EX_TEMPFAIL;
@@ -191,7 +186,7 @@ else
   $Job->{'started_at'} = gmtime;
   $Job->{'state'} = 'Running';
 
-  $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
+  $Job = api_call("jobs/create", job => $Job);
 }
 $job_id = $Job->{'uuid'};
 
@@ -321,13 +316,11 @@ if (defined $Job->{thawedfromkey})
 }
 else
 {
-  my $first_task = retry_op(sub {
-    $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
-      'job_uuid' => $Job->{'uuid'},
-      'sequence' => 0,
-      'qsequence' => 0,
-      'parameters' => {},
-    });
+  my $first_task = api_call("job_tasks/create", job_task => {
+    'job_uuid' => $Job->{'uuid'},
+    'sequence' => 0,
+    'qsequence' => 0,
+    'parameters' => {},
   });
   push @jobstep, { 'level' => 0,
                   'failures' => 0,
@@ -427,10 +420,8 @@ else {
   } else {
     # $repo is none of the above. It must be the name of a hosted
     # repository.
-    my $arv_repo_list = retry_op(sub {
-      $arv->{'repositories'}->{'list'}->execute(
-        'filters' => [['name','=',$repo]]);
-    });
+    my $arv_repo_list = api_call("repositories/list",
+                                 'filters' => [['name','=',$repo]]);
     my @repos_found = @{$arv_repo_list->{'items'}};
     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
     if ($n_found > 0) {
@@ -560,7 +551,9 @@ else {
     freeze_if_want_freeze ($installpid);
     select (undef, undef, undef, 0.1);
   }
-  Log (undef, "Install script exited ".exit_status_s($?));
+  my $install_exited = $?;
+  Log (undef, "Install script exited ".exit_status_s($install_exited));
+  exit (1) if $install_exited != 0;
 }
 
 if (!$have_slurm)
@@ -708,17 +701,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
       "--job-name=$job_id.$id.$$",
        );
-    my $build_script_to_send = "";
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
        ."&& cd $ENV{CRUNCH_TMP} ";
-    if ($build_script)
-    {
-      $build_script_to_send = $build_script;
-      $command .=
-         "&& perl -";
-    }
     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
@@ -747,18 +733,32 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
       $ENV{TASK_KEEPMOUNT} = "/keep";
 
-      # TASK_WORK is a plain docker data volume: it starts out empty,
-      # is writable, and persists until no containers use it any
-      # more. We don't use --volumes-from to share it with other
-      # containers: it is only accessible to this task, and it goes
-      # away when this task stops.
-      $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
-
-      # JOB_WORK is also a plain docker data volume for now. TODO:
-      # Share a single JOB_WORK volume across all task containers on a
-      # given worker node, and delete it when the job ends (and, in
-      # case that doesn't work, when the next job starts).
-      $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
+      # TASK_WORK is almost exactly like a docker data volume: it
+      # starts out empty, is writable, and persists until no
+      # containers use it any more. We don't use --volumes-from to
+      # share it with other containers: it is only accessible to this
+      # task, and it goes away when this task stops.
+      #
+      # However, a docker data volume is writable only by root unless
+      # the mount point already happens to exist in the container with
+      # different permissions. Therefore, we [1] assume /tmp already
+      # exists in the image and is writable by the crunch user; [2]
+      # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
+      # writable if they are created by docker while setting up the
+      # other --volumes); and [3] create $TASK_WORK inside the
+      # container using $build_script.
+      $command .= "--volume=/tmp ";
+      $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
+      $ENV{"HOME"} = $ENV{"TASK_WORK"};
+      $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
+
+      # TODO: Share a single JOB_WORK volume across all task
+      # containers on a given worker node, and delete it when the job
+      # ends (and, in case that doesn't work, when the next job
+      # starts).
+      #
+      # For now, use the same approach as TASK_WORK above.
+      $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
 
       while (my ($env_key, $env_val) = each %ENV)
       {
@@ -769,16 +769,16 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .= "--env=\QHOME=$ENV{HOME}\E ";
       $command .= "\Q$docker_hash\E ";
       $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     } else {
       # Non-docker run
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
       $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
 
     my @execargs = ('bash', '-c', $command);
-    srun (\@srunargs, \@execargs, undef, $build_script_to_send);
+    srun (\@srunargs, \@execargs, undef, $build_script);
     # exec() failed, we assume nothing happened.
     die "srun() failed on build script\n";
   }
@@ -935,10 +935,8 @@ else {
     while (my $manifest_line = <$orig_manifest>) {
       $orig_manifest_text .= $manifest_line;
     }
-    my $output = retry_op(sub {
-      $arv->{'collections'}->{'create'}->execute(
-        'collection' => {'manifest_text' => $orig_manifest_text});
-    });
+    my $output = api_call("collections/create", collection => {
+      'manifest_text' => $orig_manifest_text});
     Log(undef, "output uuid " . $output->{uuid});
     Log(undef, "output hash " . $output->{portable_data_hash});
     $Job->update_attributes('output' => $output->{portable_data_hash});
@@ -1072,15 +1070,14 @@ sub reapchildren
     my $newtask_list = [];
     my $newtask_results;
     do {
-      $newtask_results = retry_op(sub {
-        $arv->{'job_tasks'}->{'list'}->execute(
-          'where' => {
-            'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-          },
-          'order' => 'qsequence',
-          'offset' => scalar(@$newtask_list),
-        );
-      });
+      $newtask_results = api_call(
+        "job_tasks/list",
+        'where' => {
+          'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+        },
+        'order' => 'qsequence',
+        'offset' => scalar(@$newtask_list),
+      );
       push(@$newtask_list, @{$newtask_results->{items}});
     } while (@{$newtask_results->{items}});
     foreach my $arvados_task (@$newtask_list) {
@@ -1103,9 +1100,7 @@ sub check_refresh_wanted
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
   if (@stat && $stat[9] > $latest_refresh) {
     $latest_refresh = scalar time;
-    my $Job2 = retry_op(sub {
-      $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
-    });
+    my $Job2 = api_call("jobs/get", uuid => $jobspec);
     for my $attr ('cancelled_at',
                   'cancelled_by_user_uuid',
                   'cancelled_by_client_uuid',
@@ -1620,9 +1615,7 @@ sub find_docker_image {
   # If not, return undef for both values.
   my $locator = shift;
   my ($streamname, $filename);
-  my $image = retry_op(sub {
-    $arv->{collections}->{get}->execute(uuid => $locator);
-  });
+  my $image = api_call("collections/get", uuid => $locator);
   if ($image) {
     foreach my $line (split(/\n/, $image->{manifest_text})) {
       my @tokens = split(/\s+/, $line);
@@ -1669,10 +1662,14 @@ sub retry_count {
 }
 
 sub retry_op {
-  # Given a function reference, call it with the remaining arguments.  If
-  # it dies, retry it with exponential backoff until it succeeds, or until
-  # the current retry_count is exhausted.
+  # Pass in two function references.
+  # This method will be called with the remaining arguments.
+  # If it dies, retry it with exponential backoff until it succeeds,
+  # or until the current retry_count is exhausted.  After each failure
+  # that can be retried, the second function will be called with
+  # the current try count (0-based), next try time, and error message.
   my $operation = shift;
+  my $retry_callback = shift;
   my $retries = retry_count();
   foreach my $try_count (0..$retries) {
     my $next_try = time + (2 ** $try_count);
@@ -1680,6 +1677,7 @@ sub retry_op {
     if (!$@) {
       return $result;
     } elsif ($try_count < $retries) {
+      $retry_callback->($try_count, $next_try, $@);
       my $sleep_time = $next_try - time;
       sleep($sleep_time) if ($sleep_time > 0);
     }
@@ -1690,6 +1688,32 @@ sub retry_op {
   die($@ . "\n");
 }
 
+sub api_call {
+  # Pass in a /-separated API method name, and arguments for it.
+  # This function will call that method, retrying as needed until
+  # the current retry_count is exhausted, with a log on the first failure.
+  my $method_name = shift;
+  my $log_api_retry = sub {
+    my ($try_count, $next_try_at, $errmsg) = @_;
+    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+    $errmsg =~ s/\s/ /g;
+    $errmsg =~ s/\s+$//;
+    my $retry_msg;
+    if ($next_try_at < time) {
+      $retry_msg = "Retrying.";
+    } else {
+      my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
+      $retry_msg = "Retrying at $next_try_fmt.";
+    }
+    Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
+  };
+  my $method = $arv;
+  foreach my $key (split(/\//, $method_name)) {
+    $method = $method->{$key};
+  }
+  return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+}
+
 sub exit_status_s {
   # Given a $?, return a human-readable exit code string like "0" or
   # "1" or "0 with signal 1" or "1 with signal 11".
@@ -1715,9 +1739,10 @@ use File::Path qw( make_path remove_tree );
 my $destdir = $ENV{"CRUNCH_SRC"};
 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
 my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
-for my $dir ($destdir, $task_work) {
+for my $dir ($destdir, $job_work, $task_work) {
   if ($dir) {
     make_path $dir;
     -e $dir or die "Failed to create temporary directory ($dir): $!";
@@ -1728,16 +1753,17 @@ if ($task_work) {
   remove_tree($task_work, {keep_root => 1});
 }
 
+my @git_archive_data = <DATA>;
+if (!@git_archive_data) {
+  # Nothing to extract -> nothing to install.
+  run_argv_and_exit();
+}
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
-    if (@ARGV) {
-        exec(@ARGV);
-        die "Cannot exec `@ARGV`: $!";
-    } else {
-        exit 0;
-    }
+  # This version already installed -> nothing to do.
+  run_argv_and_exit();
 }
 
 unlink "$destdir.commit";
@@ -1746,13 +1772,10 @@ open STDOUT, ">", "$destdir.log";
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
-my @git_archive_data = <DATA>;
-if (@git_archive_data) {
-  open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-  print TARX @git_archive_data;
-  if(!close(TARX)) {
-    die "'tar -C $destdir -xf -' exited $?: $!";
-  }
+open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
+print TARX @git_archive_data;
+if(!close(TARX)) {
+  die "'tar -C $destdir -xf -' exited $?: $!";
 }
 
 my $pwd;
@@ -1784,11 +1807,16 @@ if ($commit) {
 
 close L;
 
-if (@ARGV) {
+run_argv_and_exit();
+
+sub run_argv_and_exit
+{
+  if (@ARGV) {
     exec(@ARGV);
     die "Cannot exec `@ARGV`: $!";
-} else {
+  } else {
     exit 0;
+  }
 }
 
 sub shell_or_die