X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/63cb5c235ccacdc1665a89560bc8c16fcbefd8d6..e359c70eb7adb66df7c6aae6edb738e5f543d6e4:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 369bc3e1ae..2246c86fb6 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -86,6 +86,8 @@ use POSIX ':sys_wait_h'; use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; +use Cwd qw(realpath); +use Data::Dumper; use Digest::MD5 qw(md5_hex); use Getopt::Long; use IPC::Open2; @@ -94,7 +96,9 @@ use File::Temp; use Fcntl ':flock'; use File::Path qw( make_path remove_tree ); +use constant TASK_TEMPFAIL => 111; use constant EX_TEMPFAIL => 75; +use constant EX_RETRY_UNLOCKED => 93; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { @@ -115,26 +119,28 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); +my %proc; my $force_unlock; my $git_dir; my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; +my $docker_bin = "/usr/bin/docker.io"; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, + 'docker-bin=s' => \$docker_bin, ); if (defined $job_api_token) { $ENV{ARVADOS_API_TOKEN} = $job_api_token; } -my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; -my $local_job = 0; +my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST}; $SIG{'USR1'} = sub @@ -146,8 +152,6 @@ $SIG{'USR2'} = sub $main::ENV{CRUNCH_DEBUG} = 0; }; - - my $arv = Arvados->new('apiVersion' => 'v1'); my $Job; @@ -156,20 +160,44 @@ my $dbh; my $sth; my @jobstep; -my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); - +my $local_job; if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification - $Job = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + $Job = api_call("jobs/get", uuid => $jobspec); + $local_job = 0; +} +else +{ + $Job = JSON::decode_json($jobspec); + $local_job = 1; +} + + +# Make sure our workers (our slurm nodes, localhost, or whatever) are +# at least able to run basic commands: they aren't down or severely +# misconfigured. +my $cmd = ['true']; +if ($Job->{docker_image_locator}) { + $cmd = [$docker_bin, 'ps', '-q']; +} +Log(undef, "Sanity check is `@$cmd`"); +srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], + $cmd, + {fork => 1}); +if ($? != 0) { + Log(undef, "Sanity check failed: ".exit_status_s($?)); + exit EX_TEMPFAIL; +} +Log(undef, "Sanity check OK"); + + +my $User = api_call("users/current"); + +if (!$local_job) { if (!$force_unlock) { # Claim this job, and make sure nobody else does - eval { retry_op(sub { - # lock() sets is_locked_by_uuid and changes state to Running. - $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) - }); }; + eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; @@ -178,8 +206,6 @@ if ($jobspec =~ /^[-a-z\d]+$/) } else { - $Job = JSON::decode_json($jobspec); - if (!$resume_stash) { map { croak ("No $_ specified") unless $Job->{$_} } @@ -190,7 +216,7 @@ else $Job->{'started_at'} = gmtime; $Job->{'state'} = 'Running'; - $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); + $Job = api_call("jobs/create", job => $Job); } $job_id = $Job->{'uuid'}; @@ -201,6 +227,16 @@ $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'}; +my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`; +if ($? == 0) { + $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /; + chomp($gem_versions); + chop($gem_versions); # Closing parentheses +} else { + $gem_versions = ""; +} +Log(undef, + "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions); Log (undef, "check slurm allocation"); my @slot; @@ -307,9 +343,7 @@ my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; -my $squeue_checked; -my $squeue_kill_checked; -my $output_in_keep = 0; +my $squeue_checked = 0; my $latest_refresh = scalar time; @@ -320,13 +354,11 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = retry_op(sub { - $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); + my $first_task = api_call("job_tasks/create", job_task => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, }); push @jobstep, { 'level' => 0, 'failures' => 0, @@ -341,13 +373,9 @@ if (!$have_slurm) must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); } - -my $build_script; -do { - local $/ = undef; - $build_script = ; -}; +my $build_script = handle_readall(\*DATA); my $nodelist = join(",", @node); +my $git_tar_count = 0; if (!defined $no_clear_tmp) { # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* @@ -356,8 +384,12 @@ if (!defined $no_clear_tmp) { my $cleanpid = fork(); if ($cleanpid == 0) { + # Find FUSE mounts that look like Keep mounts (the mount path has the + # word "keep") and unmount them. Then clean up work directories. + # TODO: When #5036 is done and widely deployed, we can get rid of the + # regular expression and just unmount everything with type fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); + ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); exit (1); } while (1) @@ -369,8 +401,56 @@ if (!defined $no_clear_tmp) { Log (undef, "Cleanup command exited ".exit_status_s($?)); } +# If this job requires a Docker image, install that. +my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem); +if ($docker_locator = $Job->{docker_image_locator}) { + ($docker_stream, $docker_hash) = find_docker_image($docker_locator); + if (!$docker_hash) + { + croak("No Docker image hash found from locator $docker_locator"); + } + $docker_stream =~ s/^\.//; + my $docker_install_script = qq{ +if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then + arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +fi +}; + my $docker_pid = fork(); + if ($docker_pid == 0) + { + srun (["srun", "--nodelist=" . join(',', @node)], + ["/bin/sh", "-ec", $docker_install_script]); + exit ($?); + } + while (1) + { + last if $docker_pid == waitpid (-1, WNOHANG); + freeze_if_want_freeze ($docker_pid); + select (undef, undef, undef, 0.1); + } + if ($? != 0) + { + croak("Installing Docker image from $docker_locator exited " + .exit_status_s($?)); + } + + # Determine whether this version of Docker supports memory+swap limits. + srun(["srun", "--nodelist=" . $node[0]], + ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="], + {fork => 1}); + $docker_limitmem = ($? == 0); + + if ($Job->{arvados_sdk_version}) { + # The job also specifies an Arvados SDK version. Add the SDKs to the + # tar file for the build script to install. + Log(undef, sprintf("Packing Arvados SDK version %s for installation", + $Job->{arvados_sdk_version})); + add_git_archive("git", "--git-dir=$git_dir", "archive", + "--prefix=.arvados.sdk/", + $Job->{arvados_sdk_version}, "sdk"); + } +} -my $git_archive; if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) { # If script_version looks like an absolute path, *and* the --git-dir # argument was not given -- which implies we were not invoked by @@ -426,10 +506,8 @@ else { } else { # $repo is none of the above. It must be the name of a hosted # repository. - my $arv_repo_list = retry_op(sub { - $arv->{'repositories'}->{'list'}->execute( - 'filters' => [['name','=',$repo]]); - }); + my $arv_repo_list = api_call("repositories/list", + 'filters' => [['name','=',$repo]]); my @repos_found = @{$arv_repo_list->{'items'}}; my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { @@ -526,12 +604,10 @@ else { } $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - $git_archive = `$gitcmd archive ''\Q$commit\E`; - if ($?) { - croak("Error: $gitcmd archive exited ".exit_status_s($?)); - } + add_git_archive("$gitcmd archive ''\Q$commit\E"); } +my $git_archive = combined_git_archive(); if (!defined $git_archive) { Log(undef, "Skip install phase (no git archive)"); if ($have_slurm) { @@ -539,69 +615,88 @@ if (!defined $git_archive) { } } else { - Log(undef, "Run install script on all workers"); - - my @srunargs = ("srun", - "--nodelist=$nodelist", - "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); - my @execargs = ("sh", "-c", - "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); - - # Note: this section is almost certainly unnecessary if we're - # running tasks in docker containers. - my $installpid = fork(); - if ($installpid == 0) - { - srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); - exit (1); - } - while (1) - { - last if $installpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($installpid); - select (undef, undef, undef, 0.1); - } - Log (undef, "Install script exited ".exit_status_s($?)); -} - -if (!$have_slurm) -{ - # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above) - must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); -} + my $install_exited; + my $install_script_tries_left = 3; + for (my $attempts = 0; $attempts < 3; $attempts++) { + Log(undef, "Run install script on all workers"); + + my @srunargs = ("srun", + "--nodelist=$nodelist", + "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); + my @execargs = ("sh", "-c", + "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); + + $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); + my ($install_stderr_r, $install_stderr_w); + pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!"); + set_nonblocking($install_stderr_r); + my $installpid = fork(); + if ($installpid == 0) + { + close($install_stderr_r); + fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec + open(STDOUT, ">&", $install_stderr_w); + open(STDERR, ">&", $install_stderr_w); + srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); + exit (1); + } + close($install_stderr_w); + # Tell freeze_if_want_freeze how to kill the child, otherwise the + # "waitpid(installpid)" loop won't get interrupted by a freeze: + $proc{$installpid} = {}; + my $stderr_buf = ''; + # Track whether anything appears on stderr other than slurm errors + # ("srun: ...") and the "starting: ..." message printed by the + # srun subroutine itself: + my $stderr_anything_from_script = 0; + my $match_our_own_errors = '^(srun: error: |starting: \[)'; + while ($installpid != waitpid(-1, WNOHANG)) { + freeze_if_want_freeze ($installpid); + # Wait up to 0.1 seconds for something to appear on stderr, then + # do a non-blocking read. + my $bits = fhbits($install_stderr_r); + select ($bits, undef, $bits, 0.1); + if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf))) + { + while ($stderr_buf =~ /^(.*?)\n/) { + my $line = $1; + substr $stderr_buf, 0, 1+length($line), ""; + Log(undef, "stderr $line"); + if ($line !~ /$match_our_own_errors/) { + $stderr_anything_from_script = 1; + } + } + } + } + delete $proc{$installpid}; + $install_exited = $?; + close($install_stderr_r); + if (length($stderr_buf) > 0) { + if ($stderr_buf !~ /$match_our_own_errors/) { + $stderr_anything_from_script = 1; + } + Log(undef, "stderr $stderr_buf") + } -# If this job requires a Docker image, install that. -my $docker_bin = "/usr/bin/docker.io"; -my ($docker_locator, $docker_stream, $docker_hash); -if ($docker_locator = $Job->{docker_image_locator}) { - ($docker_stream, $docker_hash) = find_docker_image($docker_locator); - if (!$docker_hash) - { - croak("No Docker image hash found from locator $docker_locator"); + Log (undef, "Install script exited ".exit_status_s($install_exited)); + last if $install_exited == 0 || $main::please_freeze; + # If the install script fails but doesn't print an error message, + # the next thing anyone is likely to do is just run it again in + # case it was a transient problem like "slurm communication fails + # because the network isn't reliable enough". So we'll just do + # that ourselves (up to 3 attempts in total). OTOH, if there is an + # error message, the problem is more likely to have a real fix and + # we should fail the job so the fixing process can start, instead + # of doing 2 more attempts. + last if $stderr_anything_from_script; } - $docker_stream =~ s/^\.//; - my $docker_install_script = qq{ -if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then - arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load -fi -}; - my $docker_pid = fork(); - if ($docker_pid == 0) - { - srun (["srun", "--nodelist=" . join(',', @node)], - ["/bin/sh", "-ec", $docker_install_script]); - exit ($?); - } - while (1) - { - last if $docker_pid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($docker_pid); - select (undef, undef, undef, 0.1); + + foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) { + unlink($tar_filename); } - if ($? != 0) - { - croak("Installing Docker image from $docker_locator exited " - .exit_status_s($?)); + + if ($install_exited != 0) { + croak("Giving up"); } } @@ -627,16 +722,48 @@ ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; +my $working_slot_count = scalar(@slot); @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; -Log (undef, "start level $level"); +my $initial_tasks_this_level = 0; +foreach my $id (@jobstep_todo) { + $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level); +} + +# If the number of tasks scheduled at this level #T is smaller than the number +# of slots available #S, only use the first #T slots, or the first slot on +# each node, whichever number is greater. +# +# When we dispatch tasks later, we'll allocate whole-node resources like RAM +# based on these numbers. Using fewer slots makes more resources available +# to each individual task, which should normally be a better strategy when +# there are fewer of them running with less parallelism. +# +# Note that this calculation is not redone if the initial tasks at +# this level queue more tasks at the same level. This may harm +# overall task throughput for that level. +my @freeslot; +if ($initial_tasks_this_level < @node) { + @freeslot = (0..$#node); +} elsif ($initial_tasks_this_level < @slot) { + @freeslot = (0..$initial_tasks_this_level - 1); +} else { + @freeslot = (0..$#slot); +} +my $round_num_freeslots = scalar(@freeslot); +my %round_max_slots = (); +for (my $ii = $#freeslot; $ii >= 0; $ii--) { + my $this_slot = $slot[$freeslot[$ii]]; + my $node_name = $this_slot->{node}->{name}; + $round_max_slots{$node_name} ||= $this_slot->{cpu}; + last if (scalar(keys(%round_max_slots)) >= @node); +} -my %proc; -my @freeslot = (0..$#slot); +Log(undef, "start level $level with $round_num_freeslots slots"); my @holdslot; my %reader; my $progress_is_dirty = 1; @@ -645,7 +772,6 @@ my $progress_stats_updated = 0; update_progress_stats(); - THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { @@ -656,15 +782,15 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) next; } - pipe $reader{$id}, "writer" or croak ($!); - my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); - fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); + pipe $reader{$id}, "writer" or croak("pipe() failed: $!"); + set_nonblocking($reader{$id}); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; my $childslotname = join (".", $slot[$childslot]->{node}->{name}, $slot[$childslot]->{cpu}); + my $childpid = fork(); if ($childpid == 0) { @@ -694,11 +820,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$"; + $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated - $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; + $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; $ENV{"GZIP"} = "-n"; @@ -709,62 +835,99 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); - my $build_script_to_send = ""; my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " - ."&& cd $ENV{CRUNCH_TMP} "; - if ($build_script) - { - $build_script_to_send = $build_script; - $command .= - "&& perl -"; - } + ."&& cd $ENV{CRUNCH_TMP} " + # These environment variables get used explicitly later in + # $command. No tool is expected to read these values directly. + .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' {"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } my @execargs = ('bash', '-c', $command); - srun (\@srunargs, \@execargs, undef, $build_script_to_send); + srun (\@srunargs, \@execargs, undef, $build_script); # exec() failed, we assume nothing happened. - Log(undef, "srun() failed on build script"); - die; + die "srun() failed on build script\n"; } close("writer"); if (!defined $childpid) @@ -800,14 +963,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) while (!@freeslot || - (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) + ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { - last THISROUND if $main::please_freeze; + last THISROUND if $main::please_freeze || defined($main::success); if ($main::please_info) { $main::please_info = 0; freeze(); - collate_output(); + create_output_collection(); save_meta(1); update_progress_stats(); } @@ -825,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { update_progress_stats(); } + $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 && + $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { @@ -848,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } # give up if no nodes are succeeding - if (!grep { $_->{node}->{losing_streak} == 0 && - $_->{node}->{hold_count} < 4 } @slot) { - my $message = "Every node has failed -- giving up on this round"; - Log (undef, $message); + if ($working_slot_count < 1) { + Log(undef, "Every node has failed -- giving up"); last THISROUND; } } @@ -869,7 +1032,7 @@ while (%proc) $main::please_continue = 0; goto THISROUND; } - $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; + $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info; readfrompipes (); if (!reapchildren()) { @@ -887,18 +1050,18 @@ freeze_if_want_freeze(); if (!defined $main::success) { - if (@jobstep_todo && - $thisround_succeeded == 0 && - ($thisround_failed == 0 || $thisround_failed > 4)) - { + if (!@jobstep_todo) { + $main::success = 1; + } elsif ($working_slot_count < 1) { + save_output_collection(); + save_meta(); + exit(EX_RETRY_UNLOCKED); + } elsif ($thisround_succeeded == 0 && + ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } - if (!@jobstep_todo) - { - $main::success = 1; - } } goto ONELEVEL if !defined $main::success; @@ -906,32 +1069,7 @@ goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -my $collated_output = &collate_output(); - -if (!$collated_output) { - Log(undef, "output undef"); -} -else { - eval { - open(my $orig_manifest, '-|', 'arv-get', $collated_output) - or die "failed to get collated manifest: $!"; - my $orig_manifest_text = ''; - while (my $manifest_line = <$orig_manifest>) { - $orig_manifest_text .= $manifest_line; - } - my $output = retry_op(sub { - $arv->{'collections'}->{'create'}->execute( - 'collection' => {'manifest_text' => $orig_manifest_text}); - }); - Log(undef, "output uuid " . $output->{uuid}); - Log(undef, "output hash " . $output->{portable_data_hash}); - $Job->update_attributes('output' => $output->{portable_data_hash}); - }; - if ($@) { - Log (undef, "Failed to register output manifest: $@"); - } -} - +my $collated_output = save_output_collection(); Log (undef, "finish"); save_meta(); @@ -997,7 +1135,7 @@ sub reapchildren { my $temporary_fail; $temporary_fail ||= $Jobstep->{node_fail}; - $temporary_fail ||= ($exitvalue == 111); + $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); ++$thisround_failed; ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; @@ -1018,13 +1156,12 @@ sub reapchildren Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary ' : 'permanent', + $temporary_fail ? 'temporary' : 'permanent', $elapsed)); if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { # Give up on this task, and the whole job $main::success = 0; - $main::please_freeze = 1; } # Put this task back on the todo queue push @jobstep_todo, $jobstepid; @@ -1043,7 +1180,9 @@ sub reapchildren $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); $Jobstep->{'arvados_task'}->save; process_stderr ($jobstepid, $task_success); - Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); + Log ($jobstepid, sprintf("task output (%d bytes): %s", + length($Jobstep->{'arvados_task'}->{output}), + $Jobstep->{'arvados_task'}->{output})); close $reader{$jobstepid}; delete $reader{$jobstepid}; @@ -1056,15 +1195,14 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = retry_op(sub { - $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); - }); + $newtask_results = api_call( + "job_tasks/list", + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { @@ -1087,9 +1225,7 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - my $Job2 = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', @@ -1110,67 +1246,92 @@ sub check_refresh_wanted sub check_squeue { - # return if the kill list was checked <4 seconds ago - if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) + my $last_squeue_check = $squeue_checked; + + # Do not call `squeue` or check the kill list more than once every + # 15 seconds. + return if $last_squeue_check > time - 15; + $squeue_checked = time; + + # Look for children from which we haven't received stderr data since + # the last squeue check. If no such children exist, all procs are + # alive and there's no need to even look at squeue. + # + # As long as the crunchstat poll interval (10s) is shorter than the + # squeue check interval (15s) this should make the squeue check an + # infrequent event. + my $silent_procs = 0; + for my $jobstep (values %proc) { - return; + if ($jobstep->{stderr_at} < $last_squeue_check) + { + $silent_procs++; + } } - $squeue_kill_checked = time; + return if $silent_procs == 0; # use killem() on procs whose killtime is reached - for (keys %proc) + while (my ($pid, $jobstep) = each %proc) { - if (exists $proc{$_}->{killtime} - && $proc{$_}->{killtime} <= time) + if (exists $jobstep->{killtime} + && $jobstep->{killtime} <= time + && $jobstep->{stderr_at} < $last_squeue_check) { - killem ($_); + my $sincewhen = ""; + if ($jobstep->{stderr_at}) { + $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + } + Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); + killem ($pid); } } - # return if the squeue was checked <60 seconds ago - if (defined $squeue_checked && $squeue_checked > time - 60) - { - return; - } - $squeue_checked = time; - if (!$have_slurm) { # here is an opportunity to check for mysterious problems with local procs return; } - # get a list of steps still running - my @squeue = `squeue -s -h -o '%i %j' && echo ok`; - chop @squeue; - if ($squeue[-1] ne "ok") + # Get a list of steps still running. Note: squeue(1) says --steps + # selects a format (which we override anyway) and allows us to + # specify which steps we're interested in (which we don't). + # Importantly, it also changes the meaning of %j from "job name" to + # "step name" and (although this isn't mentioned explicitly in the + # docs) switches from "one line per job" mode to "one line per step" + # mode. Without it, we'd just get a list of one job, instead of a + # list of N steps. + my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`; + if ($? != 0) { + Log(undef, "warning: squeue exit status $? ($!)"); return; } - pop @squeue; + chop @squeue; # which of my jobsteps are running, according to squeue? my %ok; - foreach (@squeue) + for my $jobstepname (@squeue) { - if (/^(\d+)\.(\d+) (\S+)/) - { - if ($1 eq $ENV{SLURM_JOBID}) - { - $ok{$3} = 1; - } - } + $ok{$jobstepname} = 1; } - # which of my active child procs (>60s old) were not mentioned by squeue? - foreach (keys %proc) + # Check for child procs >60s old and not mentioned by squeue. + while (my ($pid, $jobstep) = each %proc) { - if ($proc{$_}->{time} < time - 60 - && !exists $ok{$proc{$_}->{jobstepname}} - && !exists $proc{$_}->{killtime}) + if ($jobstep->{time} < time - 60 + && $jobstep->{jobstepname} + && !exists $ok{$jobstep->{jobstepname}} + && !exists $jobstep->{killtime}) { - # kill this proc if it hasn't exited in 30 seconds - $proc{$_}->{killtime} = time + 30; + # According to slurm, this task has ended (successfully or not) + # -- but our srun child hasn't exited. First we must wait (30 + # seconds) in case this is just a race between communication + # channels. Then, if our srun child process still hasn't + # terminated, we'll conclude some slurm communication + # error/delay has caused the task to die without notifying srun, + # and we'll kill srun ourselves. + $jobstep->{killtime} = time + 30; + Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited"); } } } @@ -1181,7 +1342,7 @@ sub release_allocation if ($have_slurm) { Log (undef, "release job allocation"); - system "scancel $ENV{SLURM_JOBID}"; + system "scancel $ENV{SLURM_JOB_ID}"; } } @@ -1195,6 +1356,7 @@ sub readfrompipes while (0 < sysread ($reader{$job}, $buf, 8192)) { print STDERR $buf if $ENV{CRUNCH_DEBUG}; + $jobstep[$job]->{stderr_at} = time; $jobstep[$job]->{stderr} .= $buf; preprocess_stderr ($job); if (length ($jobstep[$job]->{stderr}) > 16384) @@ -1220,7 +1382,7 @@ sub preprocess_stderr # whoa. $main::please_freeze = 1; } - elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) { + elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) { $jobstep[$job]->{node_fail} = 1; ban_node_by_slot($jobstep[$job]->{slotindex}); } @@ -1242,16 +1404,19 @@ sub process_stderr sub fetch_block { my $hash = shift; - my ($keep, $child_out, $output_block); - - my $cmd = "arv-get \Q$hash\E"; - open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"; - $output_block = ''; + my $keep; + if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) { + Log(undef, "fetch_block run error from arv-get $hash: $!"); + return undef; + } + my $output_block = ""; while (1) { my $buf; my $bytes = sysread($keep, $buf, 1024 * 1024); if (!defined $bytes) { - die "reading from arv-get: $!"; + Log(undef, "fetch_block read error from arv-get: $!"); + $output_block = undef; + last; } elsif ($bytes == 0) { # sysread returns 0 at the end of the pipe. last; @@ -1261,62 +1426,98 @@ sub fetch_block } } close $keep; + if ($?) { + Log(undef, "fetch_block arv-get exited " . exit_status_s($?)); + $output_block = undef; + } return $output_block; } -sub collate_output +# Create a collection by concatenating the output of all tasks (each +# task's output is either a manifest fragment, a locator for a +# manifest fragment stored in Keep, or nothing at all). Return the +# portable_data_hash of the new collection. +sub create_output_collection { Log (undef, "collate"); my ($child_out, $child_in); - my $pid = open2($child_out, $child_in, 'arv-put', '--raw', - '--retries', retry_count()); - my $joboutput; + my $pid = open2($child_out, $child_in, 'python', '-c', q{ +import arvados +import sys +print (arvados.api("v1").collections(). + create(body={"manifest_text": sys.stdin.read()}). + execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) +}, retry_count()); + + my $task_idx = -1; + my $manifest_size = 0; for (@jobstep) { - next if (!exists $_->{'arvados_task'}->{'output'} || - !$_->{'arvados_task'}->{'success'}); + ++$task_idx; my $output = $_->{'arvados_task'}->{output}; - if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) - { - $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; - print $child_in $output; - } - elsif (@jobstep == 1) - { - $joboutput = $output; - last; - } - elsif (defined (my $outblock = fetch_block ($output))) - { - $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; - print $child_in $outblock; + next if (!defined($output)); + my $next_write; + if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) { + $next_write = fetch_block($output); + } else { + $next_write = $output; } - else - { - Log (undef, "XXX fetch_block($output) failed XXX"); + if (defined($next_write)) { + if (!defined(syswrite($child_in, $next_write))) { + # There's been an error writing. Stop the loop. + # We'll log details about the exit code later. + last; + } else { + $manifest_size += length($next_write); + } + } else { + my $uuid = $_->{'arvados_task'}->{'uuid'}; + Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)"); $main::success = 0; } } - $child_in->close; + close($child_in); + Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens"); - if (!defined $joboutput) { - my $s = IO::Select->new($child_out); - if ($s->can_read(120)) { - sysread($child_out, $joboutput, 64 * 1024 * 1024); - chomp($joboutput); - # TODO: Ensure exit status == 0. + my $joboutput; + my $s = IO::Select->new($child_out); + if ($s->can_read(120)) { + sysread($child_out, $joboutput, 1024 * 1024); + waitpid($pid, 0); + if ($?) { + Log(undef, "output collection creation exited " . exit_status_s($?)); + $joboutput = undef; } else { - Log (undef, "timed out reading from 'arv-put'"); + chomp($joboutput); + } + } else { + Log (undef, "timed out while creating output collection"); + foreach my $signal (2, 2, 2, 15, 15, 9) { + kill($signal, $pid); + last if waitpid($pid, WNOHANG) == -1; + sleep(1); } } - # TODO: kill $pid instead of waiting, now that we've decided to - # ignore further output. - waitpid($pid, 0); + close($child_out); return $joboutput; } +# Calls create_output_collection, logs the result, and returns it. +# If that was successful, save that as the output in the job record. +sub save_output_collection { + my $collated_output = create_output_collection(); + + if (!$collated_output) { + Log(undef, "Failed to write output collection"); + } + else { + Log(undef, "job output $collated_output"); + $Job->update_attributes('output' => $collated_output); + } + return $collated_output; +} sub killem { @@ -1362,6 +1563,8 @@ sub fhbits # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_out_buf is a string containing all output read from arv-put so far. +# $log_pipe_out_select is an IO::Select object around $log_pipe_out. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: @@ -1370,6 +1573,13 @@ sub fhbits # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # +# log_writer_read_output([$timeout]) +# Read output from $log_pipe_out and append it to $log_pipe_out_buf. +# Passes $timeout to the select() call, with a default of 0.01. +# Returns the result of the last read() call on $log_pipe_out, or +# -1 if read() wasn't called because select() timed out. +# Only other log_writer_* functions should need to call this. +# # log_writer_send($txt) # Writes $txt to the output log collection. # @@ -1380,22 +1590,40 @@ sub fhbits # Returns a true value if there is currently a live arv-put # process, false otherwise. # -my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); +my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select, + $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, - 'arv-put', '--portable-data-hash', + 'arv-put', + '--stream', '--retries', '3', '--filename', $logfilename, '-'); + $log_pipe_out_buf = ""; + $log_pipe_out_select = IO::Select->new($log_pipe_out); +} + +sub log_writer_read_output { + my $timeout = shift || 0.01; + my $read = -1; + while ($read && $log_pipe_out_select->can_read($timeout)) { + $read = read($log_pipe_out, $log_pipe_out_buf, 65536, + length($log_pipe_out_buf)); + } + if (!defined($read)) { + Log(undef, "error reading log manifest from arv-put: $!"); + } + return $read; } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; + log_writer_read_output(); } sub log_writer_finish() @@ -1403,22 +1631,24 @@ sub log_writer_finish() return unless $log_pipe_pid; close($log_pipe_in); - my $arv_put_output; - my $s = IO::Select->new($log_pipe_out); - if ($s->can_read(120)) { - sysread($log_pipe_out, $arv_put_output, 1024); - chomp($arv_put_output); - } else { + my $read_result = log_writer_read_output(120); + if ($read_result == -1) { Log (undef, "timed out reading from 'arv-put'"); + } elsif ($read_result != 0) { + Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); - $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; if ($?) { - Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } + close($log_pipe_out); + my $arv_put_output = $log_pipe_out_buf; + $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = + $log_pipe_out_select = undef; + return $arv_put_output; } @@ -1458,7 +1688,7 @@ sub croak my $message = "@_ at $file line $line\n"; Log (undef, $message); freeze() if @jobstep_todo; - collate_output() if @jobstep_todo; + create_output_collection() if @jobstep_todo; cleanup(); save_meta(); die; @@ -1482,10 +1712,21 @@ sub save_meta return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); - my $loglocator = log_writer_finish(); - Log (undef, "log manifest is $loglocator"); - $Job->{'log'} = $loglocator; - $Job->update_attributes('log', $loglocator); + my $log_manifest = ""; + if ($Job->{log}) { + my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); + $log_manifest .= $prev_log_coll->{manifest_text}; + } + $log_manifest .= log_writer_finish(); + + my $log_coll = api_call( + "collections/create", ensure_unique_name => 1, collection => { + manifest_text => $log_manifest, + owner_uuid => $Job->{owner_uuid}, + name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}), + }); + Log(undef, "log collection is " . $log_coll->{portable_data_hash}); + $Job->update_attributes('log' => $log_coll->{portable_data_hash}); } @@ -1510,7 +1751,7 @@ sub freeze_if_want_freeze } } freeze(); - collate_output(); + create_output_collection(); cleanup(); save_meta(); exit 1; @@ -1555,11 +1796,19 @@ sub srun my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; - print STDERR (join (" ", - map { / / ? "'$_'" : $_ } - (@$args)), - "\n") - if $ENV{CRUNCH_DEBUG}; + + $Data::Dumper::Terse = 1; + $Data::Dumper::Indent = 0; + my $show_cmd = Dumper($args); + $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; + $show_cmd =~ s/\n/ /g; + if ($opts->{fork}) { + Log(undef, "starting: $show_cmd"); + } else { + # This is a child process: parent is in charge of reading our + # stderr and copying it to Log() if needed. + warn "starting: $show_cmd\n"; + } if (defined $stdin) { my $child = open STDIN, "-|"; @@ -1602,9 +1851,7 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - my $image = retry_op(sub { - $arv->{collections}->{get}->execute(uuid => $locator); - }); + my $image = api_call("collections/get", uuid => $locator); if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); @@ -1651,10 +1898,14 @@ sub retry_count { } sub retry_op { - # Given a function reference, call it with the remaining arguments. If - # it dies, retry it with exponential backoff until it succeeds, or until - # the current retry_count is exhausted. + # Pass in two function references. + # This method will be called with the remaining arguments. + # If it dies, retry it with exponential backoff until it succeeds, + # or until the current retry_count is exhausted. After each failure + # that can be retried, the second function will be called with + # the current try count (0-based), next try time, and error message. my $operation = shift; + my $retry_callback = shift; my $retries = retry_count(); foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); @@ -1662,6 +1913,7 @@ sub retry_op { if (!$@) { return $result; } elsif ($try_count < $retries) { + $retry_callback->($try_count, $next_try, $@); my $sleep_time = $next_try - time; sleep($sleep_time) if ($sleep_time > 0); } @@ -1672,6 +1924,32 @@ sub retry_op { die($@ . "\n"); } +sub api_call { + # Pass in a /-separated API method name, and arguments for it. + # This function will call that method, retrying as needed until + # the current retry_count is exhausted, with a log on the first failure. + my $method_name = shift; + my $log_api_retry = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); + }; + my $method = $arv; + foreach my $key (split(/\//, $method_name)) { + $method = $method->{$key}; + } + return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); +} + sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". @@ -1686,94 +1964,306 @@ sub exit_status_s { return $s; } +sub handle_readall { + # Pass in a glob reference to a file handle. + # Read all its contents and return them as a string. + my $fh_glob_ref = shift; + local $/ = undef; + return <$fh_glob_ref>; +} + +sub tar_filename_n { + my $n = shift; + return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n); +} + +sub add_git_archive { + # Pass in a git archive command as a string or list, a la system(). + # This method will save its output to be included in the archive sent to the + # build script. + my $git_input; + $git_tar_count++; + if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) { + croak("Failed to save git archive: $!"); + } + my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_); + close($git_input); + waitpid($git_pid, 0); + close(GIT_ARCHIVE); + if ($?) { + croak("Failed to save git archive: git exited " . exit_status_s($?)); + } +} + +sub combined_git_archive { + # Combine all saved tar archives into a single archive, then return its + # contents in a string. Return undef if no archives have been saved. + if ($git_tar_count < 1) { + return undef; + } + my $base_tar_name = tar_filename_n(1); + foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) { + my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append); + if ($tar_exit != 0) { + croak("Error preparing build archive: tar -A exited " . + exit_status_s($tar_exit)); + } + } + if (!open(GIT_TAR, "<", $base_tar_name)) { + croak("Could not open build archive: $!"); + } + my $tar_contents = handle_readall(\*GIT_TAR); + close(GIT_TAR); + return $tar_contents; +} + +sub set_nonblocking { + my $fh = shift; + my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!); + fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!); +} + __DATA__ #!/usr/bin/perl - -# checkout-and-build +# +# This is crunch-job's internal dispatch script. crunch-job running on the API +# server invokes this script on individual compute nodes, or localhost if we're +# running a job locally. It gets called in two modes: +# +# * No arguments: Installation mode. Read a tar archive from the DATA +# file handle; it includes the Crunch script's source code, and +# maybe SDKs as well. Those should be installed in the proper +# locations. This runs outside of any Docker container, so don't try to +# introspect Crunch's runtime environment. +# +# * With arguments: Crunch script run mode. This script should set up the +# environment, then run the command specified in the arguments. This runs +# inside any Docker container. use Fcntl ':flock'; -use File::Path qw( make_path ); +use File::Path qw( make_path remove_tree ); +use POSIX qw(getcwd); + +use constant TASK_TEMPFAIL => 111; + +# Map SDK subdirectories to the path environments they belong to. +my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB"); my $destdir = $ENV{"CRUNCH_SRC"}; -my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; +my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; +my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt"); +my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; -for my $dir ($destdir, $task_work) { - if ($dir) { - make_path $dir; - -e $dir or die "Failed to create temporary directory ($dir): $!"; +open(STDOUT_ORIG, ">&", STDOUT); +open(STDERR_ORIG, ">&", STDERR); + +for my $dir ($destdir, $job_work, $task_work) { + if ($dir) { + make_path $dir; + -e $dir or die "Failed to create temporary directory ($dir): $!"; + } +} + +if ($task_work) { + remove_tree($task_work, {keep_root => 1}); +} + +### Crunch script run mode +if (@ARGV) { + # We want to do routine logging during task 0 only. This gives the user + # the information they need, but avoids repeating the information for every + # task. + my $Log; + if ($ENV{TASK_SEQUENCE} eq "0") { + $Log = sub { + my $msg = shift; + printf STDERR_ORIG "[Crunch] $msg\n", @_; + }; + } else { + $Log = sub { }; + } + + my $python_src = "$install_dir/python"; + my $venv_dir = "$job_work/.arvados.venv"; + my $venv_built = -e "$venv_dir/bin/activate"; + if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) { + shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages", + "--python=python2.7", $venv_dir); + shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src); + $venv_built = 1; + $Log->("Built Python SDK virtualenv"); + } + + my $pip_bin = "pip"; + if ($venv_built) { + $Log->("Running in Python SDK virtualenv"); + $pip_bin = "$venv_dir/bin/pip"; + my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); + @ARGV = ("/bin/sh", "-ec", + ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); + } elsif (-d $python_src) { + $Log->("Warning: virtualenv not found inside Docker container default " . + "\$PATH. Can't install Python SDK."); + } + + my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; + if ($pkgs) { + $Log->("Using Arvados SDK:"); + foreach my $line (split /\n/, $pkgs) { + $Log->($line); } + } else { + $Log->("Arvados SDK packages not found"); + } + + while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) { + my $sdk_path = "$install_dir/$sdk_dir"; + if (-d $sdk_path) { + if ($ENV{$sdk_envkey}) { + $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey}; + } else { + $ENV{$sdk_envkey} = $sdk_path; + } + $Log->("Arvados SDK added to %s", $sdk_envkey); + } + } + + exec(@ARGV); + die "Cannot exec `@ARGV`: $!"; } +### Installation mode open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit && -d $destdir) { - if (@ARGV) { - exec(@ARGV); - die "Cannot exec `@ARGV`: $!"; - } else { - exit 0; - } -} +if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) { + # This exact git archive (source + arvados sdk) is already installed + # here, so there's no need to reinstall it. -unlink "$destdir.commit"; -open STDOUT, ">", "$destdir.log"; -open STDERR, ">&STDOUT"; + # We must consume our DATA section, though: otherwise the process + # feeding it to us will get SIGPIPE. + my $buf; + while (read(DATA, $buf, 65536)) { } + + exit(0); +} +unlink "$destdir.archive_hash"; mkdir $destdir; -my @git_archive_data = ; -if (@git_archive_data) { - open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; - print TARX @git_archive_data; + +do { + # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1). + local $SIG{PIPE} = "IGNORE"; + warn "Extracting archive: $archive_hash\n"; + # --ignore-zeros is necessary sometimes: depending on how much NUL + # padding tar -A put on our combined archive (which in turn depends + # on the length of the component archives) tar without + # --ignore-zeros will exit before consuming stdin and cause close() + # to fail on the resulting SIGPIPE. + if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) { + die "Error launching 'tar -xC $destdir': $!"; + } + # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we + # get SIGPIPE. We must feed it data incrementally. + my $tar_input; + while (read(DATA, $tar_input, 65536)) { + print TARX $tar_input; + } if(!close(TARX)) { - die "'tar -C $destdir -xf -' exited $?: $!"; + die "'tar -xC $destdir' exited $?: $!"; } -} +}; -my $pwd; -chomp ($pwd = `pwd`); -my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; mkdir $install_dir; -for my $src_path ("$destdir/arvados/sdk/python") { - if (-d $src_path) { - shell_or_die ("virtualenv", $install_dir); - shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install"); +my $sdk_root = "$destdir/.arvados.sdk/sdk"; +if (-d $sdk_root) { + foreach my $sdk_lang (("python", + map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) { + if (-d "$sdk_root/$sdk_lang") { + if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) { + die "Failed to install $sdk_lang SDK: $!"; + } + } + } +} + +my $python_dir = "$install_dir/python"; +if ((-d $python_dir) and can_run("python2.7")) { + open(my $egg_info_pipe, "-|", + "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null"); + my @egg_info_errors = <$egg_info_pipe>; + close($egg_info_pipe); + if ($?) { + if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) { + # egg_info apparently failed because it couldn't ask git for a build tag. + # Specify no build tag. + open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); + print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; + close($pysdk_cfg); + } else { + my $egg_info_exit = $? >> 8; + foreach my $errline (@egg_info_errors) { + print STDERR_ORIG $errline; + } + warn "python setup.py egg_info failed: exit $egg_info_exit"; + exit ($egg_info_exit || 1); + } } } +# Hide messages from the install script (unless it fails: shell_or_die +# will show $destdir.log in that case). +open(STDOUT, ">>", "$destdir.log"); +open(STDERR, ">&", STDOUT); + if (-e "$destdir/crunch_scripts/install") { - shell_or_die ("$destdir/crunch_scripts/install", $install_dir); + shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { # Old version - shell_or_die ("./tests/autotests.sh", $install_dir); + shell_or_die (undef, "./tests/autotests.sh", $install_dir); } elsif (-e "./install.sh") { - shell_or_die ("./install.sh", $install_dir); + shell_or_die (undef, "./install.sh", $install_dir); } -if ($commit) { - unlink "$destdir.commit.new"; - symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; - rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; +if ($archive_hash) { + unlink "$destdir.archive_hash.new"; + symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!"; + rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!"; } close L; -if (@ARGV) { - exec(@ARGV); - die "Cannot exec `@ARGV`: $!"; -} else { - exit 0; +sub can_run { + my $command_name = shift; + open(my $which, "-|", "which", $command_name); + while (<$which>) { } + close($which); + return ($? == 0); } sub shell_or_die { + my $exitcode = shift; + if ($ENV{"DEBUG"}) { print STDERR "@_\n"; } - system (@_) == 0 - or die "@_ failed: $! exit 0x".sprintf("%x",$?); + if (system (@_) != 0) { + my $err = $!; + my $code = $?; + my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f); + open STDERR, ">&STDERR_ORIG"; + system ("cat $destdir.log >&2"); + warn "@_ failed ($err): $exitstatus"; + if (defined($exitcode)) { + exit $exitcode; + } + else { + exit (($code >> 8) || 1); + } + } } __DATA__