Merge branch 'master' into 2221-complete-docker
authorTim Pierce <twp@curoverse.com>
Tue, 11 Mar 2014 19:54:22 +0000 (15:54 -0400)
committerTim Pierce <twp@curoverse.com>
Tue, 11 Mar 2014 19:54:22 +0000 (15:54 -0400)
1  2 
sdk/cli/bin/crunch-job

diff --combined sdk/cli/bin/crunch-job
index 87b4fbf8a5385f498c2e016f9225d467a3dd664a,4c8acbb59a5ad917dd1f3d43203bb203fe84f4f3..5d362f4529ffd545ecb476adcdb78dfcabfa0d70
@@@ -71,8 -71,9 +71,8 @@@ use POSIX ':sys_wait_h'
  use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  use Arvados;
  use Getopt::Long;
 -use Warehouse;
 -use Warehouse::Stream;
 -use IPC::System::Simple qw(capturex);
 +use IPC::Open2;
 +use IO::Select;
  
  $ENV{"TMPDIR"} ||= "/tmp";
  unless (defined $ENV{"CRUNCH_TMP"}) {
@@@ -165,10 -166,10 +165,10 @@@ els
  }
  $job_id = $Job->{'uuid'};
  
 -$metastream = Warehouse::Stream->new(whc => new Warehouse);
 -$metastream->clear;
 -$metastream->name('.');
 -$metastream->write_start($job_id . '.log.txt');
 +$metastream = Warehouse::Stream->new(whc => new Warehouse);
 +$metastream->clear;
 +$metastream->name('.');
 +$metastream->write_start($job_id . '.log.txt');
  
  
  $Job->{'runtime_constraints'} ||= {};
@@@ -579,10 -580,6 +579,6 @@@ for (my $todo_ptr = 0; $todo_ptr <= $#j
        $command .=
          "&& perl -";
      }
-     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
-     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
-     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
-     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
      $command .=
          "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
      my @execargs = ('bash', '-c', $command);
@@@ -736,7 -733,7 +732,7 @@@ if ($job_has_uuid) 
  if ($Job->{'output'})
  {
    eval {
 -    my $manifest_text = capturex("whget", $Job->{'output'});
 +    my $manifest_text = `arv keep get $Job->{'output'}`;
      $arv->{'collections'}->{'create'}->execute('collection' => {
        'uuid' => $Job->{'output'},
        'manifest_text' => $manifest_text,
@@@ -1036,23 -1033,12 +1032,23 @@@ sub process_stder
    } split ("\n", $jobstep[$job]->{stderr});
  }
  
 +sub fetch_block
 +{
 +  my $hash = shift;
 +  my ($child_out, $child_in, $output_block);
 +
 +  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
 +  sysread($child_out, $output_block, 64 * 1024 * 1024);
 +  waitpid($pid, 0);
 +  return $output_block;
 +}
  
  sub collate_output
  {
 -  my $whc = Warehouse->new;
    Log (undef, "collate");
 -  $whc->write_start (1);
 +
 +  my ($child_out, $child_in);
 +  my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
    my $joboutput;
    for (@jobstep)
    {
      if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
      {
        $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
 -      $whc->write_data ($output);
 +      print $child_in $output;
      }
      elsif (@jobstep == 1)
      {
        $joboutput = $output;
 -      $whc->write_finish;
 +      last;
      }
 -    elsif (defined (my $outblock = $whc->fetch_block ($output)))
 +    elsif (defined (my $outblock = fetch_block ($output)))
      {
        $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
 -      $whc->write_data ($outblock);
 +      print $child_in $outblock;
      }
      else
      {
 -      my $errstr = $whc->errstr;
 -      $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
 +      print $child_in "XXX fetch_block($output) failed XXX\n";
        $main::success = 0;
      }
    }
 -  $joboutput = $whc->write_finish if !defined $joboutput;
 +  if (!defined $joboutput) {
 +    my $s = IO::Select->new($child_out);
 +    sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(0);
 +  }
 +  $child_in->close;
 +  waitpid($pid, 0);
 +
    if ($joboutput)
    {
      Log (undef, "output $joboutput");
@@@ -1162,8 -1143,8 +1158,8 @@@ sub Log                         # ($jobstep_id, $logmessage
    }
    print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
  
 -  return if !$metastream;
 -  $metastream->write_data ($datetime . " " . $message);
 +  return if !$metastream;
 +  $metastream->write_data ($datetime . " " . $message);
  }
  
  
@@@ -1191,20 -1172,20 +1187,20 @@@ sub cleanu
  
  sub save_meta
  {
 -  my $justcheckpoint = shift; # false if this will be the last meta saved
 -  my $m = $metastream;
 -  $m = $m->copy if $justcheckpoint;
 -  $m->write_finish;
 -  my $whc = Warehouse->new;
 -  my $loglocator = $whc->store_block ($m->as_string);
 -  $arv->{'collections'}->{'create'}->execute('collection' => {
 -    'uuid' => $loglocator,
 -    'manifest_text' => $m->as_string,
 -  });
 -  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
 -  Log (undef, "log manifest is $loglocator");
 -  $Job->{'log'} = $loglocator;
 -  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
 +#  my $justcheckpoint = shift; # false if this will be the last meta saved
 +#  my $m = $metastream;
 +#  $m = $m->copy if $justcheckpoint;
 +#  $m->write_finish;
 +#  my $whc = Warehouse->new;
 +#  my $loglocator = $whc->store_block ($m->as_string);
 +#  $arv->{'collections'}->{'create'}->execute('collection' => {
 +#    'uuid' => $loglocator,
 +#    'manifest_text' => $m->as_string,
 +#  });
 +#  undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
 +#  Log (undef, "log manifest is $loglocator");
 +#  $Job->{'log'} = $loglocator;
 +#  $Job->update_attributes('log', $loglocator) if $job_has_uuid;
  }
  
  
@@@ -1248,64 -1229,64 +1244,64 @@@ sub tha
  {
    croak ("Thaw not implemented");
  
 -  my $whc;
 -  my $key = shift;
 -  Log (undef, "thaw from $key");
 -
 -  @jobstep = ();
 -  @jobstep_done = ();
 -  @jobstep_todo = ();
 -  @jobstep_tomerge = ();
 -  $jobstep_tomerge_level = 0;
 -  my $frozenjob = {};
 -
 -  my $stream = new Warehouse::Stream ( whc => $whc,
 -                                     hash => [split (",", $key)] );
 -  $stream->rewind;
 -  while (my $dataref = $stream->read_until (undef, "\n\n"))
 -  {
 -    if ($$dataref =~ /^job /)
 -    {
 -      foreach (split ("\n", $$dataref))
 -      {
 -      my ($k, $v) = split ("=", $_, 2);
 -      $frozenjob->{$k} = freezeunquote ($v);
 -      }
 -      next;
 -    }
 -
 -    if ($$dataref =~ /^merge (\d+) (.*)/)
 -    {
 -      $jobstep_tomerge_level = $1;
 -      @jobstep_tomerge
 -        = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
 -      next;
 -    }
 -
 -    my $Jobstep = { };
 -    foreach (split ("\n", $$dataref))
 -    {
 -      my ($k, $v) = split ("=", $_, 2);
 -      $Jobstep->{$k} = freezeunquote ($v) if $k;
 -    }
 -    $Jobstep->{'failures'} = 0;
 -    push @jobstep, $Jobstep;
 -
 -    if ($Jobstep->{exitcode} eq "0")
 -    {
 -      push @jobstep_done, $#jobstep;
 -    }
 -    else
 -    {
 -      push @jobstep_todo, $#jobstep;
 -    }
 -  }
 -
 -  foreach (qw (script script_version script_parameters))
 -  {
 -    $Job->{$_} = $frozenjob->{$_};
 -  }
 -  $Job->save if $job_has_uuid;
 +  my $whc;
 +  my $key = shift;
 +  Log (undef, "thaw from $key");
 +
 +  @jobstep = ();
 +  @jobstep_done = ();
 +  @jobstep_todo = ();
 +  @jobstep_tomerge = ();
 +  $jobstep_tomerge_level = 0;
 +  my $frozenjob = {};
 +
 +  my $stream = new Warehouse::Stream ( whc => $whc,
 +  #                                  hash => [split (",", $key)] );
 +  $stream->rewind;
 +  while (my $dataref = $stream->read_until (undef, "\n\n"))
 +  {
 +    if ($$dataref =~ /^job /)
 +    {
 +      foreach (split ("\n", $$dataref))
 +      {
 +  #   my ($k, $v) = split ("=", $_, 2);
 +  #   $frozenjob->{$k} = freezeunquote ($v);
 +      }
 +      next;
 +    }
 +
 +    if ($$dataref =~ /^merge (\d+) (.*)/)
 +    {
 +      $jobstep_tomerge_level = $1;
 +      @jobstep_tomerge
 +  #     = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
 +      next;
 +    }
 +
 +    my $Jobstep = { };
 +    foreach (split ("\n", $$dataref))
 +    {
 +      my ($k, $v) = split ("=", $_, 2);
 +      $Jobstep->{$k} = freezeunquote ($v) if $k;
 +    }
 +    $Jobstep->{'failures'} = 0;
 +    push @jobstep, $Jobstep;
 +
 +    if ($Jobstep->{exitcode} eq "0")
 +    {
 +      push @jobstep_done, $#jobstep;
 +    }
 +    else
 +    {
 +      push @jobstep_todo, $#jobstep;
 +    }
 +  }
 +
 +  foreach (qw (script script_version script_parameters))
 +  {
 +    $Job->{$_} = $frozenjob->{$_};
 +  }
 +  $Job->save if $job_has_uuid;
  }