Merge branch 'master' into 1971-show-image-thumbnails
[arvados.git] / sdk / cli / bin / crunch-job
index 4c8acbb59a5ad917dd1f3d43203bb203fe84f4f3..25c1ee0857e521a898fe7711af01e44db7ee9ccf 100755 (executable)
@@ -1,5 +1,5 @@
 #!/usr/bin/perl
 #!/usr/bin/perl
-# -*- mode: perl; perl-indent-level: 2; -*-
+# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 
 =head1 NAME
 
 
 =head1 NAME
 
@@ -33,6 +33,12 @@ Path to .git directory where the specified commit is found.
 
 Arvados API authorization token to use during the course of the job.
 
 
 Arvados API authorization token to use during the course of the job.
 
+=item --no-clear-tmp
+
+Do not clear per-job/task temporary directories during initial job
+setup. This can speed up development and debugging when running jobs
+locally.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -71,9 +77,10 @@ use POSIX ':sys_wait_h';
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Arvados;
 use Getopt::Long;
-use Warehouse;
-use Warehouse::Stream;
-use IPC::System::Simple qw(capturex);
+use IPC::Open2;
+use IO::Select;
+use File::Temp;
+use Fcntl ':flock';
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -92,11 +99,13 @@ my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
+my $no_clear_tmp;
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
+           'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
     );
 
            'resume-stash=s' => \$resume_stash,
     );
 
@@ -166,11 +175,8 @@ else
 }
 $job_id = $Job->{'uuid'};
 
 }
 $job_id = $Job->{'uuid'};
 
-$metastream = Warehouse::Stream->new(whc => new Warehouse);
-$metastream->clear;
-$metastream->name('.');
-$metastream->write_start($job_id . '.log.txt');
-
+my $keep_logfile = $job_id . '.log.txt';
+my $local_logfile = File::Temp->new();
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -189,7 +195,7 @@ if (!$have_slurm)
 }
 if (exists $ENV{SLURM_NODELIST})
 {
 }
 if (exists $ENV{SLURM_NODELIST})
 {
-  push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
+  push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
 }
 foreach (@sinfo)
 {
 }
 foreach (@sinfo)
 {
@@ -323,6 +329,12 @@ else
 }
 
 
 }
 
 
+if (!$have_slurm)
+{
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
+
 my $build_script;
 
 
 my $build_script;
 
 
@@ -331,6 +343,11 @@ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
+  if (!defined $no_clear_tmp) {
+    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
+    system($clear_tmp_cmd) == 0
+       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
+  }
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
@@ -351,22 +368,24 @@ else
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
+  if (!defined $no_clear_tmp) {
+    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
 
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
+    my $cleanpid = fork();
+    if ($cleanpid == 0)
+    {
+      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+      exit (1);
+    }
+    while (1)
+    {
+      last if $cleanpid == waitpid (-1, WNOHANG);
+      freeze_if_want_freeze ($cleanpid);
+      select (undef, undef, undef, 0.1);
+    }
+    Log (undef, "Clean-work-dir exited $?");
   }
   }
-  Log (undef, "Clean-work-dir exited $?");
 
   # Install requested code version
 
 
   # Install requested code version
 
@@ -381,24 +400,33 @@ else
   my $commit;
   my $git_archive;
   my $treeish = $Job->{'script_version'};
   my $commit;
   my $git_archive;
   my $treeish = $Job->{'script_version'};
-  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
-  # Todo: let script_version specify repository instead of expecting
-  # parent process to figure it out.
-  $ENV{"CRUNCH_SRC_URL"} = $repo;
 
 
-  # Create/update our clone of the remote git repo
+  # If we're running under crunch-dispatch, it will have pulled the
+  # appropriate source tree into its own repository, and given us that
+  # repo's path as $git_dir. If we're running a "local" job, and a
+  # script_version was specified, it's up to the user to provide the
+  # full path to a local repository in Job->{repository}.
+  #
+  # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
+  # git-archive --remote where appropriate.
+  #
+  # TODO: Accept a locally-hosted Arvados repository by name or
+  # UUID. Use arvados.v1.repositories.list or .get to figure out the
+  # appropriate fetch-url.
+  my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
 
 
-  if (!-d $ENV{"CRUNCH_SRC"}) {
-    system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
-       or croak ("git clone $repo failed: exit ".($?>>8));
-    system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
+  $ENV{"CRUNCH_SRC_URL"} = $repo;
+
+  if (-d "$repo/.git") {
+    # We were given a working directory, but we are only interested in
+    # the index.
+    $repo = "$repo/.git";
   }
   }
-  `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
 
   # If this looks like a subversion r#, look for it in git-svn commit messages
 
   if ($treeish =~ m{^\d{1,4}$}) {
 
   # If this looks like a subversion r#, look for it in git-svn commit messages
 
   if ($treeish =~ m{^\d{1,4}$}) {
-    my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
+    my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
     chomp $gitlog;
     if ($gitlog =~ /^[a-f0-9]{40}$/) {
       $commit = $gitlog;
     chomp $gitlog;
     if ($gitlog =~ /^[a-f0-9]{40}$/) {
       $commit = $gitlog;
@@ -409,15 +437,7 @@ else
   # If that didn't work, try asking git to look it up as a tree-ish.
 
   if (!defined $commit) {
   # If that didn't work, try asking git to look it up as a tree-ish.
 
   if (!defined $commit) {
-
-    my $cooked_treeish = $treeish;
-    if ($treeish !~ m{^[0-9a-f]{5,}$}) {
-      # Looks like a git branch name -- make sure git knows it's
-      # relative to the remote repo
-      $cooked_treeish = "origin/$treeish";
-    }
-
-    my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
+    my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
     chomp $found;
     if ($found =~ /^[0-9a-f]{40}$/s) {
       $commit = $found;
     chomp $found;
     if ($found =~ /^[0-9a-f]{40}$/s) {
       $commit = $found;
@@ -442,7 +462,7 @@ else
     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
     @execargs = ("sh", "-c",
                 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
     @execargs = ("sh", "-c",
                 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
+    $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
   }
   else {
     croak ("could not figure out commit id for $treeish");
   }
   else {
     croak ("could not figure out commit id for $treeish");
@@ -463,6 +483,12 @@ else
   Log (undef, "Install exited $?");
 }
 
   Log (undef, "Install exited $?");
 }
 
+if (!$have_slurm)
+{
+  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -554,8 +580,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
-    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
+    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
@@ -733,11 +759,21 @@ if ($job_has_uuid) {
 if ($Job->{'output'})
 {
   eval {
 if ($Job->{'output'})
 {
   eval {
-    my $manifest_text = capturex("whget", $Job->{'output'});
+    my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     });
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     });
+    if ($Job->{'output_is_persistent'}) {
+      $arv->{'links'}->{'create'}->execute('link' => {
+        'tail_kind' => 'arvados#user',
+        'tail_uuid' => $User->{'uuid'},
+        'head_kind' => 'arvados#collection',
+        'head_uuid' => $Job->{'output'},
+        'link_class' => 'resources',
+        'name' => 'wants',
+      });
+    }
   };
   if ($@) {
     Log (undef, "Failed to register output manifest: $@");
   };
   if ($@) {
     Log (undef, "Failed to register output manifest: $@");
@@ -1033,12 +1069,24 @@ sub process_stderr
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
   } split ("\n", $jobstep[$job]->{stderr});
 }
 
+sub fetch_block
+{
+  my $hash = shift;
+  my ($keep, $child_out, $output_block);
+
+  my $cmd = "arv keep get \Q$hash\E";
+  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
+  sysread($keep, $output_block, 64 * 1024 * 1024);
+  close $keep;
+  return $output_block;
+}
 
 sub collate_output
 {
 
 sub collate_output
 {
-  my $whc = Warehouse->new;
   Log (undef, "collate");
   Log (undef, "collate");
-  $whc->write_start (1);
+
+  my ($child_out, $child_in);
+  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
   my $joboutput;
   for (@jobstep)
   {
   my $joboutput;
   for (@jobstep)
   {
@@ -1049,26 +1097,37 @@ sub collate_output
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($output);
+      print $child_in $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
     }
     elsif (@jobstep == 1)
     {
       $joboutput = $output;
-      $whc->write_finish;
+      last;
     }
     }
-    elsif (defined (my $outblock = $whc->fetch_block ($output)))
+    elsif (defined (my $outblock = fetch_block ($output)))
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
     {
       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
-      $whc->write_data ($outblock);
+      print $child_in $outblock;
     }
     else
     {
     }
     else
     {
-      my $errstr = $whc->errstr;
-      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
+      Log (undef, "XXX fetch_block($output) failed XXX");
       $main::success = 0;
     }
   }
       $main::success = 0;
     }
   }
-  $joboutput = $whc->write_finish if !defined $joboutput;
+  $child_in->close;
+
+  if (!defined $joboutput) {
+    my $s = IO::Select->new($child_out);
+    if ($s->can_read(120)) {
+      sysread($child_out, $joboutput, 64 * 1024 * 1024);
+      chomp($joboutput);
+    } else {
+      Log (undef, "timed out reading from 'arv keep put'");
+    }
+  }
+  waitpid($pid, 0);
+
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
   if ($joboutput)
   {
     Log (undef, "output $joboutput");
@@ -1143,8 +1202,9 @@ sub Log                           # ($jobstep_id, $logmessage)
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  return if !$metastream;
-  $metastream->write_data ($datetime . " " . $message);
+  if ($metastream) {
+    print $metastream $datetime . " " . $message;
+  }
 }
 
 
 }
 
 
@@ -1173,16 +1233,15 @@ sub cleanup
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
 sub save_meta
 {
   my $justcheckpoint = shift; # false if this will be the last meta saved
-  my $m = $metastream;
-  $m = $m->copy if $justcheckpoint;
-  $m->write_finish;
-  my $whc = Warehouse->new;
-  my $loglocator = $whc->store_block ($m->as_string);
-  $arv->{'collections'}->{'create'}->execute('collection' => {
-    'uuid' => $loglocator,
-    'manifest_text' => $m->as_string,
-  });
-  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
+  return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+
+  $local_logfile->flush;
+  my $cmd = "arv keep put --filename ''\Q$keep_logfile\E "
+      . quotemeta($local_logfile->filename);
+  my $loglocator = `$cmd`;
+  die "system $cmd failed: $?" if $?;
+
+  $local_logfile = undef;   # the temp file is automatically deleted
   Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
   Log (undef, "log manifest is $loglocator");
   $Job->{'log'} = $loglocator;
   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
@@ -1228,65 +1287,6 @@ sub freeze
 sub thaw
 {
   croak ("Thaw not implemented");
 sub thaw
 {
   croak ("Thaw not implemented");
-
-  my $whc;
-  my $key = shift;
-  Log (undef, "thaw from $key");
-
-  @jobstep = ();
-  @jobstep_done = ();
-  @jobstep_todo = ();
-  @jobstep_tomerge = ();
-  $jobstep_tomerge_level = 0;
-  my $frozenjob = {};
-
-  my $stream = new Warehouse::Stream ( whc => $whc,
-                                      hash => [split (",", $key)] );
-  $stream->rewind;
-  while (my $dataref = $stream->read_until (undef, "\n\n"))
-  {
-    if ($$dataref =~ /^job /)
-    {
-      foreach (split ("\n", $$dataref))
-      {
-       my ($k, $v) = split ("=", $_, 2);
-       $frozenjob->{$k} = freezeunquote ($v);
-      }
-      next;
-    }
-
-    if ($$dataref =~ /^merge (\d+) (.*)/)
-    {
-      $jobstep_tomerge_level = $1;
-      @jobstep_tomerge
-         = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
-      next;
-    }
-
-    my $Jobstep = { };
-    foreach (split ("\n", $$dataref))
-    {
-      my ($k, $v) = split ("=", $_, 2);
-      $Jobstep->{$k} = freezeunquote ($v) if $k;
-    }
-    $Jobstep->{'failures'} = 0;
-    push @jobstep, $Jobstep;
-
-    if ($Jobstep->{exitcode} eq "0")
-    {
-      push @jobstep_done, $#jobstep;
-    }
-    else
-    {
-      push @jobstep_todo, $#jobstep;
-    }
-  }
-
-  foreach (qw (script script_version script_parameters))
-  {
-    $Job->{$_} = $frozenjob->{$_};
-  }
-  $Job->save if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1346,6 +1346,15 @@ sub ban_node_by_slot {
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
+sub must_lock_now
+{
+  my ($lockfile, $error_message) = @_;
+  open L, ">", $lockfile or croak("$lockfile: $!");
+  if (!flock L, LOCK_EX|LOCK_NB) {
+    croak("Can't lock $lockfile: $error_message\n");
+  }
+}
+
 __DATA__
 #!/usr/bin/perl
 
 __DATA__
 #!/usr/bin/perl