Use a simple 'open' for fetch_block (freeing up an extra filehandle). refs #2325...
[arvados.git] / sdk / cli / bin / crunch-job
index 87b4fbf8a5385f498c2e016f9225d467a3dd664a..2f8623ba66490f4cf930714473b43a05b4e3215f 100755 (executable)
@@ -33,6 +33,12 @@ Path to .git directory where the specified commit is found.
 
 Arvados API authorization token to use during the course of the job.
 
 
 Arvados API authorization token to use during the course of the job.
 
+=item --no-clear-tmp
+
+Do not clear per-job/task temporary directories during initial job
+setup. This can speed up development and debugging when running jobs
+locally.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -73,6 +79,8 @@ use Arvados;
 use Getopt::Long;
 use IPC::Open2;
 use IO::Select;
 use Getopt::Long;
 use IPC::Open2;
 use IO::Select;
+use File::Temp;
+use Fcntl ':flock';
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -91,11 +99,13 @@ my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
+my $no_clear_tmp;
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
+           'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
     );
 
            'resume-stash=s' => \$resume_stash,
     );
 
@@ -165,11 +175,8 @@ else
 }
 $job_id = $Job->{'uuid'};
 
 }
 $job_id = $Job->{'uuid'};
 
-# $metastream = Warehouse::Stream->new(whc => new Warehouse);
-# $metastream->clear;
-# $metastream->name('.');
-# $metastream->write_start($job_id . '.log.txt');
-
+my $keep_logfile = $job_id . '.log.txt';
+my $local_logfile = File::Temp->new();
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
 
 $Job->{'runtime_constraints'} ||= {};
 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@@ -322,6 +329,12 @@ else
 }
 
 
 }
 
 
+if (!$have_slurm)
+{
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
+
 my $build_script;
 
 
 my $build_script;
 
 
@@ -330,6 +343,11 @@ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
+  if (!defined $no_clear_tmp) {
+    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
+    system($clear_tmp_cmd) == 0
+       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
+  }
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
@@ -350,22 +368,24 @@ else
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
+  if (!defined $no_clear_tmp) {
+    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
 
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
+    my $cleanpid = fork();
+    if ($cleanpid == 0)
+    {
+      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+      exit (1);
+    }
+    while (1)
+    {
+      last if $cleanpid == waitpid (-1, WNOHANG);
+      freeze_if_want_freeze ($cleanpid);
+      select (undef, undef, undef, 0.1);
+    }
+    Log (undef, "Clean-work-dir exited $?");
   }
   }
-  Log (undef, "Clean-work-dir exited $?");
 
   # Install requested code version
 
 
   # Install requested code version
 
@@ -462,6 +482,12 @@ else
   Log (undef, "Install exited $?");
 }
 
   Log (undef, "Install exited $?");
 }
 
+if (!$have_slurm)
+{
+  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -553,7 +579,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
+    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
@@ -579,10 +605,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .=
          "&& perl -";
     }
       $command .=
          "&& perl -";
     }
-    $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
-    $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
-    $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
-    $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
     $command .=
         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     my @execargs = ('bash', '-c', $command);
     $command .=
         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     my @execargs = ('bash', '-c', $command);
@@ -736,7 +758,7 @@ if ($job_has_uuid) {
 if ($Job->{'output'})
 {
   eval {
 if ($Job->{'output'})
 {
   eval {
-    my $manifest_text = `arv keep get $Job->{'output'}`;
+    my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
@@ -1039,11 +1061,12 @@ sub process_stderr
 sub fetch_block
 {
   my $hash = shift;
 sub fetch_block
 {
   my $hash = shift;
-  my ($child_out, $child_in, $output_block);
+  my ($child_out, $output_block);
 
 
-  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
-  sysread($child_out, $output_block, 64 * 1024 * 1024);
-  waitpid($pid, 0);
+  my $cmd = "arv keep get \Q$hash\E";
+  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"
+  sysread($keep, $output_block, 64 * 1024 * 1024);
+  close $keep;
   return $output_block;
 }
 
   return $output_block;
 }
 
@@ -1077,15 +1100,20 @@ sub collate_output
     }
     else
     {
     }
     else
     {
-      print $child_in "XXX fetch_block($output) failed XXX\n";
+      Log (undef, "XXX fetch_block($output) failed XXX");
       $main::success = 0;
     }
   }
       $main::success = 0;
     }
   }
+  $child_in->close;
+
   if (!defined $joboutput) {
     my $s = IO::Select->new($child_out);
   if (!defined $joboutput) {
     my $s = IO::Select->new($child_out);
-    sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(0);
+    if ($s->can_read(120)) {
+      sysread($child_out, $joboutput, 64 * 1024 * 1024);
+    } else {
+      Log (undef, "timed out reading from 'arv keep put'");
+    }
   }
   }
-  $child_in->close;
   waitpid($pid, 0);
 
   if ($joboutput)
   waitpid($pid, 0);
 
   if ($joboutput)
@@ -1162,8 +1190,9 @@ sub Log                           # ($jobstep_id, $logmessage)
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
   }
   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
 
-  # return if !$metastream;
-  # $metastream->write_data ($datetime . " " . $message);
+  if ($metastream) {
+    print $metastream $datetime . " " . $message;
+  }
 }
 
 
 }
 
 
@@ -1191,20 +1220,19 @@ sub cleanup
 
 sub save_meta
 {
 
 sub save_meta
 {
-#  my $justcheckpoint = shift; # false if this will be the last meta saved
-#  my $m = $metastream;
-#  $m = $m->copy if $justcheckpoint;
-#  $m->write_finish;
-#  my $whc = Warehouse->new;
-#  my $loglocator = $whc->store_block ($m->as_string);
-#  $arv->{'collections'}->{'create'}->execute('collection' => {
-#    'uuid' => $loglocator,
-#    'manifest_text' => $m->as_string,
-#  });
-#  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
-#  Log (undef, "log manifest is $loglocator");
-#  $Job->{'log'} = $loglocator;
-#  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
+  my $justcheckpoint = shift; # false if this will be the last meta saved
+  return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
+
+  $local_logfile->flush;
+  my $cmd = "arv keep put --filename \Q$keep_logfile\E "
+      . quotemeta($local_logfile->filename);
+  my $loglocator = `$cmd`;
+  die "system $cmd failed: $?" if $?;
+
+  $local_logfile = undef;   # the temp file is automatically deleted
+  Log (undef, "log manifest is $loglocator");
+  $Job->{'log'} = $loglocator;
+  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1247,65 +1275,6 @@ sub freeze
 sub thaw
 {
   croak ("Thaw not implemented");
 sub thaw
 {
   croak ("Thaw not implemented");
-
-  # my $whc;
-  # my $key = shift;
-  # Log (undef, "thaw from $key");
-
-  # @jobstep = ();
-  # @jobstep_done = ();
-  # @jobstep_todo = ();
-  # @jobstep_tomerge = ();
-  # $jobstep_tomerge_level = 0;
-  # my $frozenjob = {};
-
-  # my $stream = new Warehouse::Stream ( whc => $whc,
-  #                                   hash => [split (",", $key)] );
-  # $stream->rewind;
-  # while (my $dataref = $stream->read_until (undef, "\n\n"))
-  # {
-  #   if ($$dataref =~ /^job /)
-  #   {
-  #     foreach (split ("\n", $$dataref))
-  #     {
-  #    my ($k, $v) = split ("=", $_, 2);
-  #    $frozenjob->{$k} = freezeunquote ($v);
-  #     }
-  #     next;
-  #   }
-
-  #   if ($$dataref =~ /^merge (\d+) (.*)/)
-  #   {
-  #     $jobstep_tomerge_level = $1;
-  #     @jobstep_tomerge
-  #      = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
-  #     next;
-  #   }
-
-  #   my $Jobstep = { };
-  #   foreach (split ("\n", $$dataref))
-  #   {
-  #     my ($k, $v) = split ("=", $_, 2);
-  #     $Jobstep->{$k} = freezeunquote ($v) if $k;
-  #   }
-  #   $Jobstep->{'failures'} = 0;
-  #   push @jobstep, $Jobstep;
-
-  #   if ($Jobstep->{exitcode} eq "0")
-  #   {
-  #     push @jobstep_done, $#jobstep;
-  #   }
-  #   else
-  #   {
-  #     push @jobstep_todo, $#jobstep;
-  #   }
-  # }
-
-  # foreach (qw (script script_version script_parameters))
-  # {
-  #   $Job->{$_} = $frozenjob->{$_};
-  # }
-  # $Job->save if $job_has_uuid;
 }
 
 
 }
 
 
@@ -1365,6 +1334,15 @@ sub ban_node_by_slot {
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
+sub must_lock_now
+{
+  my ($lockfile, $error_message) = @_;
+  open L, ">", $lockfile or croak("$lockfile: $!");
+  if (!flock L, LOCK_EX|LOCK_NB) {
+    croak("Can't lock $lockfile: $error_message\n");
+  }
+}
+
 __DATA__
 #!/usr/bin/perl
 
 __DATA__
 #!/usr/bin/perl