Merge branch 'master' into 4523-full-text-search
[arvados.git] / sdk / cli / bin / crunch-job
index 7aa02ac6994dc1444a11c6dc6eaf732361021932..c312f5d169fb77b7b853b113ce97195418dd2f56 100755 (executable)
@@ -86,6 +86,7 @@ use POSIX ':sys_wait_h';
 use POSIX qw(strftime);
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
+use Cwd qw(realpath);
 use Data::Dumper;
 use Digest::MD5 qw(md5_hex);
 use Getopt::Long;
@@ -197,6 +198,16 @@ $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
 
+my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
+if ($? == 0) {
+  $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
+  chomp($gem_versions);
+  chop($gem_versions);  # Closing parentheses
+} else {
+  $gem_versions = "";
+}
+Log(undef,
+    "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
 
 Log (undef, "check slurm allocation");
 my @slot;
@@ -334,13 +345,9 @@ if (!$have_slurm)
   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
 }
 
-
-my $build_script;
-do {
-  local $/ = undef;
-  $build_script = <DATA>;
-};
+my $build_script = handle_readall(\*DATA);
 my $nodelist = join(",", @node);
+my $git_tar_count = 0;
 
 if (!defined $no_clear_tmp) {
   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
@@ -349,8 +356,12 @@ if (!defined $no_clear_tmp) {
   my $cleanpid = fork();
   if ($cleanpid == 0)
   {
+    # Find FUSE mounts that look like Keep mounts (the mount path has the
+    # word "keep") and unmount them.  Then clean up work directories.
+    # TODO: When #5036 is done and widely deployed, we can get rid of the
+    # regular expression and just unmount everything with type fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
     exit (1);
   }
   while (1)
@@ -362,8 +373,51 @@ if (!defined $no_clear_tmp) {
   Log (undef, "Cleanup command exited ".exit_status_s($?));
 }
 
+# If this job requires a Docker image, install that.
+my $docker_bin = "/usr/bin/docker.io";
+my ($docker_locator, $docker_stream, $docker_hash);
+if ($docker_locator = $Job->{docker_image_locator}) {
+  ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
+  if (!$docker_hash)
+  {
+    croak("No Docker image hash found from locator $docker_locator");
+  }
+  $docker_stream =~ s/^\.//;
+  my $docker_install_script = qq{
+if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+fi
+};
+  my $docker_pid = fork();
+  if ($docker_pid == 0)
+  {
+    srun (["srun", "--nodelist=" . join(',', @node)],
+          ["/bin/sh", "-ec", $docker_install_script]);
+    exit ($?);
+  }
+  while (1)
+  {
+    last if $docker_pid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($docker_pid);
+    select (undef, undef, undef, 0.1);
+  }
+  if ($? != 0)
+  {
+    croak("Installing Docker image from $docker_locator exited "
+          .exit_status_s($?));
+  }
+
+  if ($Job->{arvados_sdk_version}) {
+    # The job also specifies an Arvados SDK version.  Add the SDKs to the
+    # tar file for the build script to install.
+    Log(undef, sprintf("Packing Arvados SDK version %s for installation",
+                       $Job->{arvados_sdk_version}));
+    add_git_archive("git", "--git-dir=$git_dir", "archive",
+                    "--prefix=.arvados.sdk/",
+                    $Job->{arvados_sdk_version}, "sdk");
+  }
+}
 
-my $git_archive;
 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
   # If script_version looks like an absolute path, *and* the --git-dir
   # argument was not given -- which implies we were not invoked by
@@ -517,12 +571,10 @@ else {
   }
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-  $git_archive = `$gitcmd archive ''\Q$commit\E`;
-  if ($?) {
-    croak("Error: $gitcmd archive exited ".exit_status_s($?));
-  }
+  add_git_archive("$gitcmd archive ''\Q$commit\E");
 }
 
+my $git_archive = combined_git_archive();
 if (!defined $git_archive) {
   Log(undef, "Skip install phase (no git archive)");
   if ($have_slurm) {
@@ -552,48 +604,10 @@ else {
   }
   my $install_exited = $?;
   Log (undef, "Install script exited ".exit_status_s($install_exited));
-  exit (1) if $install_exited != 0;
-}
-
-if (!$have_slurm)
-{
-  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
-  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-}
-
-# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
-if ($docker_locator = $Job->{docker_image_locator}) {
-  ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
-  if (!$docker_hash)
-  {
-    croak("No Docker image hash found from locator $docker_locator");
-  }
-  $docker_stream =~ s/^\.//;
-  my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
-    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-fi
-};
-  my $docker_pid = fork();
-  if ($docker_pid == 0)
-  {
-    srun (["srun", "--nodelist=" . join(',', @node)],
-          ["/bin/sh", "-ec", $docker_install_script]);
-    exit ($?);
-  }
-  while (1)
-  {
-    last if $docker_pid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($docker_pid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($? != 0)
-  {
-    croak("Installing Docker image from $docker_locator exited "
-          .exit_status_s($?));
+  foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+    unlink($tar_filename);
   }
+  exit (1) if $install_exited != 0;
 }
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -1260,10 +1274,10 @@ sub fetch_block
   return $output_block;
 }
 
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
 sub create_output_collection
 {
   Log (undef, "collate");
@@ -1278,10 +1292,11 @@ sub create_output_collection
                   '.execute()["portable_data_hash"]'
       );
 
+  my $task_idx = -1;
   for (@jobstep)
   {
-    next if (!exists $_->{'arvados_task'}->{'output'} ||
-             !$_->{'arvados_task'}->{'success'});
+    ++$task_idx;
+    next unless exists $_->{'arvados_task'}->{'output'};
     my $output = $_->{'arvados_task'}->{output};
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
@@ -1293,7 +1308,8 @@ sub create_output_collection
     }
     else
     {
-      Log (undef, "XXX fetch_block($output) failed XXX");
+      my $uuid = $_->{'arvados_task'}->{'uuid'};
+      Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
       $main::success = 0;
     }
   }
@@ -1715,17 +1731,87 @@ sub exit_status_s {
   return $s;
 }
 
+sub handle_readall {
+  # Pass in a glob reference to a file handle.
+  # Read all its contents and return them as a string.
+  my $fh_glob_ref = shift;
+  local $/ = undef;
+  return <$fh_glob_ref>;
+}
+
+sub tar_filename_n {
+  my $n = shift;
+  return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+  # Pass in a git archive command as a string or list, a la system().
+  # This method will save its output to be included in the archive sent to the
+  # build script.
+  my $git_input;
+  $git_tar_count++;
+  if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+    croak("Failed to save git archive: $!");
+  }
+  my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+  close($git_input);
+  waitpid($git_pid, 0);
+  close(GIT_ARCHIVE);
+  if ($?) {
+    croak("Failed to save git archive: git exited " . exit_status_s($?));
+  }
+}
+
+sub combined_git_archive {
+  # Combine all saved tar archives into a single archive, then return its
+  # contents in a string.  Return undef if no archives have been saved.
+  if ($git_tar_count < 1) {
+    return undef;
+  }
+  my $base_tar_name = tar_filename_n(1);
+  foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+    my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+    if ($tar_exit != 0) {
+      croak("Error preparing build archive: tar -A exited " .
+            exit_status_s($tar_exit));
+    }
+  }
+  if (!open(GIT_TAR, "<", $base_tar_name)) {
+    croak("Could not open build archive: $!");
+  }
+  my $tar_contents = handle_readall(\*GIT_TAR);
+  close(GIT_TAR);
+  return $tar_contents;
+}
+
 __DATA__
 #!/usr/bin/perl
-
-# checkout-and-build
+#
+# This is crunch-job's internal dispatch script.  crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally.  It gets called in two modes:
+#
+# * No arguments: Installation mode.  Read a tar archive from the DATA
+#   file handle; it includes the Crunch script's source code, and
+#   maybe SDKs as well.  Those should be installed in the proper
+#   locations.  This runs outside of any Docker container, so don't try to
+#   introspect Crunch's runtime environment.
+#
+# * With arguments: Crunch script run mode.  This script should set up the
+#   environment, then run the command specified in the arguments.  This runs
+#   inside any Docker container.
 
 use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
 
 my $destdir = $ENV{"CRUNCH_SRC"};
 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
 my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
 my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
@@ -1740,43 +1826,127 @@ if ($task_work) {
   remove_tree($task_work, {keep_root => 1});
 }
 
-my @git_archive_data = <DATA>;
-if (!@git_archive_data) {
-  # Nothing to extract -> nothing to install.
-  run_argv_and_exit();
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
+### Crunch script run mode
+if (@ARGV) {
+  # We want to do routine logging during task 0 only.  This gives the user
+  # the information they need, but avoids repeating the information for every
+  # task.
+  my $Log;
+  if ($ENV{TASK_SEQUENCE} eq "0") {
+    $Log = sub {
+      my $msg = shift;
+      printf STDERR_ORIG "[Crunch] $msg\n", @_;
+    };
+  } else {
+    $Log = sub { };
+  }
+
+  my $python_src = "$install_dir/python";
+  my $venv_dir = "$job_work/.arvados.venv";
+  my $venv_built = -e "$venv_dir/bin/activate";
+  if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
+    shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+                 "--python=python2.7", $venv_dir);
+    shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+    $venv_built = 1;
+    $Log->("Built Python SDK virtualenv");
+  }
+
+  my $pip_bin = "pip";
+  if ($venv_built) {
+    $Log->("Running in Python SDK virtualenv");
+    $pip_bin = "$venv_dir/bin/pip";
+    my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+    @ARGV = ("/bin/sh", "-ec",
+             ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
+  } elsif (-d $python_src) {
+    $Log->("Warning: virtualenv not found inside Docker container default " .
+           "\$PATH. Can't install Python SDK.");
+  }
+
+  my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
+  if ($pkgs) {
+    $Log->("Using Arvados SDK:");
+    foreach my $line (split /\n/, $pkgs) {
+      $Log->($line);
+    }
+  } else {
+    $Log->("Arvados SDK packages not found");
+  }
+
+  while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+    my $sdk_path = "$install_dir/$sdk_dir";
+    if (-d $sdk_path) {
+      if ($ENV{$sdk_envkey}) {
+        $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+      } else {
+        $ENV{$sdk_envkey} = $sdk_path;
+      }
+      $Log->("Arvados SDK added to %s", $sdk_envkey);
+    }
+  }
+
+  close(STDOUT);
+  close(STDERR);
+  open(STDOUT, ">&", STDOUT_ORIG);
+  open(STDERR, ">&", STDERR_ORIG);
+  exec(@ARGV);
+  die "Cannot exec `@ARGV`: $!";
 }
 
+### Installation mode
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
   # This version already installed -> nothing to do.
-  run_argv_and_exit();
+  exit(0);
 }
 
 unlink "$destdir.commit";
-open STDERR_ORIG, ">&STDERR";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
-
 mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX @git_archive_data;
+
+if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+  die "Error launching 'tar -xC $destdir': $!";
+}
+# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+# get SIGPIPE.  We must feed it data incrementally.
+my $tar_input;
+while (read(DATA, $tar_input, 65536)) {
+  print TARX $tar_input;
+}
 if(!close(TARX)) {
-  die "'tar -C $destdir -xf -' exited $?: $!";
+  die "'tar -xC $destdir' exited $?: $!";
 }
 
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
 
-for my $src_path ("$destdir/arvados/sdk/python") {
-  if (-d $src_path) {
-    shell_or_die ("virtualenv", $install_dir);
-    shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+  foreach my $sdk_lang (("python",
+                         map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
+    if (-d "$sdk_root/$sdk_lang") {
+      if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+        die "Failed to install $sdk_lang SDK: $!";
+      }
+    }
   }
 }
 
+my $python_dir = "$install_dir/python";
+if ((-d $python_dir) and can_run("python2.7") and
+    (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
+  # egg_info failed, probably when it asked git for a build tag.
+  # Specify no build tag.
+  open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+  print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+  close($pysdk_cfg);
+}
+
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
@@ -1794,16 +1964,12 @@ if ($commit) {
 
 close L;
 
-run_argv_and_exit();
-
-sub run_argv_and_exit
-{
-  if (@ARGV) {
-    exec(@ARGV);
-    die "Cannot exec `@ARGV`: $!";
-  } else {
-    exit 0;
-  }
+sub can_run {
+  my $command_name = shift;
+  open(my $which, "-|", "which", $command_name);
+  while (<$which>) { }
+  close($which);
+  return ($? == 0);
 }
 
 sub shell_or_die