X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1875ddb761e4ae2909d2afe0718f3d0ad5f3ce0f..cedd2b046ddf4d2f819a4d1dedbbe82d4e70e72d:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 9c9d9c1ec9..06b3da99a9 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -82,6 +82,7 @@ use IPC::Open2; use IO::Select; use File::Temp; use Fcntl ':flock'; +use File::Path qw( make_path ); $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { @@ -91,20 +92,17 @@ unless (defined $ENV{"CRUNCH_TMP"}) { $ENV{"CRUNCH_TMP"} .= "-$<"; } } + +# Create the tmp directory if it does not exist +if ( ! -d $ENV{"CRUNCH_TMP"} ) { + make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"}; +} + $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); -my $arv_cli; - -if (defined $ENV{"ARV_CLI"}) { - $arv_cli = $ENV{"ARV_CLI"}; -} -else { - $arv_cli = 'arv'; -} - my $force_unlock; my $git_dir; my $jobspec; @@ -501,16 +499,17 @@ if (!$have_slurm) # If this job requires a Docker image, install that. my $docker_bin = "/usr/bin/docker.io"; -my ($docker_locator, $docker_hash); +my ($docker_locator, $docker_stream, $docker_hash); if ($docker_locator = $Job->{docker_image_locator}) { - $docker_hash = find_docker_hash($docker_locator); + ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) { croak("No Docker image hash found from locator $docker_locator"); } + $docker_stream =~ s/^\.//; my $docker_install_script = qq{ if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then - arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load + arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load fi }; my $docker_pid = fork(); @@ -670,15 +669,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) elsif ($env_key eq "TASK_KEEPMOUNT") { $command .= "--env=\QTASK_KEEPMOUNT=/keep\E "; } - elsif ($env_key eq "CRUNCH_SRC") { - $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E "; - } else { $command .= "--env=\Q$env_key=$env_val\E "; } } } $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E "; + $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E "; $command .= "\Q$docker_hash\E "; $command .= "stdbuf --output=0 --error=0 "; $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"}; @@ -838,10 +835,12 @@ if ($job_has_uuid) { 'finished_at' => scalar gmtime) } -if ($collated_output) -{ +if (!$collated_output) { + Log(undef, "output undef"); +} +else { eval { - open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output) + open(my $orig_manifest, '-|', 'arv-get', $collated_output) or die "failed to get collated manifest: $!"; # Read the original manifest, and strip permission hints from it, # so we can put the result in a Collection. @@ -862,7 +861,8 @@ if ($collated_output) 'uuid' => md5_hex($stripped_manifest_text), 'manifest_text' => $orig_manifest_text, }); - $Job->update_attributes('output' => $output->{uuid}); + Log(undef, "output " . $output->{uuid}); + $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid; if ($Job->{'output_is_persistent'}) { $arv->{'links'}->{'create'}->execute('link' => { 'tail_kind' => 'arvados#user', @@ -994,27 +994,29 @@ sub reapchildren push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; - # Load new tasks - my $newtask_list = []; - my $newtask_results; - do { - $newtask_results = $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); - push(@$newtask_list, @{$newtask_results->{items}}); - } while (@{$newtask_results->{items}}); - foreach my $arvados_task (@$newtask_list) { - my $jobstep = { - 'level' => $arvados_task->{'sequence'}, - 'failures' => 0, - 'arvados_task' => $arvados_task - }; - push @jobstep, $jobstep; - push @jobstep_todo, $#jobstep; + if ($task_success) { + # Load new tasks + my $newtask_list = []; + my $newtask_results; + do { + $newtask_results = $arv->{'job_tasks'}->{'list'}->execute( + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); + push(@$newtask_list, @{$newtask_results->{items}}); + } while (@{$newtask_results->{items}}); + foreach my $arvados_task (@$newtask_list) { + my $jobstep = { + 'level' => $arvados_task->{'sequence'}, + 'failures' => 0, + 'arvados_task' => $arvados_task + }; + push @jobstep, $jobstep; + push @jobstep_todo, $#jobstep; + } } $progress_is_dirty = 1; @@ -1179,9 +1181,22 @@ sub fetch_block my $hash = shift; my ($keep, $child_out, $output_block); - my $cmd = "$arv_cli keep get \Q$hash\E"; + my $cmd = "arv-get \Q$hash\E"; open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"; - sysread($keep, $output_block, 64 * 1024 * 1024); + $output_block = ''; + while (1) { + my $buf; + my $bytes = sysread($keep, $buf, 1024 * 1024); + if (!defined $bytes) { + die "reading from arv-get: $!"; + } elsif ($bytes == 0) { + # sysread returns 0 at the end of the pipe. + last; + } else { + # some bytes were read into buf. + $output_block .= $buf; + } + } close $keep; return $output_block; } @@ -1191,7 +1206,7 @@ sub collate_output Log (undef, "collate"); my ($child_out, $child_in); - my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw'); + my $pid = open2($child_out, $child_in, 'arv-put', '--raw'); my $joboutput; for (@jobstep) { @@ -1228,20 +1243,11 @@ sub collate_output sysread($child_out, $joboutput, 64 * 1024 * 1024); chomp($joboutput); } else { - Log (undef, "timed out reading from 'arv keep put'"); + Log (undef, "timed out reading from 'arv-put'"); } } waitpid($pid, 0); - if ($joboutput) - { - Log (undef, "output $joboutput"); - $Job->update_attributes('output' => $joboutput) if $job_has_uuid; - } - else - { - Log (undef, "output undef"); - } return $joboutput; } @@ -1341,7 +1347,7 @@ sub save_meta return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm $local_logfile->flush; - my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E " + my $cmd = "arv-put --filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename); my $loglocator = `$cmd`; die "system $cmd failed: $?" if $?; @@ -1461,19 +1467,19 @@ sub must_lock_now } } -sub find_docker_hash { - # Given a Keep locator, search for a matching link to find the Docker hash - # of the stored image. +sub find_docker_image { + # Given a Keep locator, check to see if it contains a Docker image. + # If so, return its stream name and Docker hash. + # If not, return undef for both values. my $locator = shift; - my $links_result = $arv->{links}->{list}->execute( - filters => [["head_uuid", "=", $locator], - ["link_class", "=", "docker_image_hash"]], - limit => 1); - my $docker_hash; - foreach my $link (@{$links_result->{items}}) { - $docker_hash = lc($link->{name}); + if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) { + my @file_list = @{$image->{files}}; + if ((scalar(@file_list) == 1) && + ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) { + return ($file_list[0][0], $1); + } } - return $docker_hash; + return (undef, undef); } __DATA__