X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc87b47f7e47e89946b75563950491445c46093d..4f1085f353d44600643a8e9dd6b43a39131e7946:/services/crunch/crunch-job diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job deleted file mode 100755 index ed2c059a11..0000000000 --- a/services/crunch/crunch-job +++ /dev/null @@ -1,1398 +0,0 @@ -#!/usr/bin/perl -# -*- mode: perl; perl-indent-level: 2; -*- - -=head1 NAME - -crunch-job: Execute job steps, save snapshots as requested, collate output. - -=head1 SYNOPSIS - -Obtain job details from Arvados, run tasks on compute nodes (typically -invoked by scheduler on controller): - - crunch-job --uuid x-y-z - -Obtain job details from command line, run tasks on local machine -(typically invoked by application or developer on VM): - - crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}' - -=head1 RUNNING JOBS LOCALLY - -crunch-job's log messages appear on stderr along with the job tasks' -stderr streams. The log is saved in Keep at each checkpoint and when -the job finishes. - -If the job succeeds, the job's output locator is printed on stdout. - -While the job is running, the following signals are accepted: - -=over - -=item control-C, SIGINT, SIGQUIT - -Save a checkpoint, terminate any job tasks that are running, and stop. - -=item SIGALRM - -Save a checkpoint and continue. - -=item SIGHUP - -Refresh node allocation (i.e., check whether any nodes have been added -or unallocated). Currently this is a no-op. - -=back - -=cut - - -use strict; -use DBI; -use POSIX ':sys_wait_h'; -use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); -use Arvados; -use Getopt::Long; -use Warehouse; -use Warehouse::Stream; - -$ENV{"TMPDIR"} ||= "/tmp"; -$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job"; -$ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; -mkdir ($ENV{"CRUNCH_TMP"}); - -my $force_unlock; -my $jobspec; -my $resume_stash; -GetOptions('force-unlock' => \$force_unlock, - 'job=s' => \$jobspec, - 'resume-stash=s' => \$resume_stash, - ); - -my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; -my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/; - - -$SIG{'HUP'} = sub -{ - 1; -}; -$SIG{'USR1'} = sub -{ - $main::ENV{CRUNCH_DEBUG} = 1; -}; -$SIG{'USR2'} = sub -{ - $main::ENV{CRUNCH_DEBUG} = 0; -}; - - - -my $arv = Arvados->new; -my $metastream = Warehouse::Stream->new; -$metastream->clear; -$metastream->write_start('log.txt'); - -my $User = {}; -my $Job = {}; -my $job_id; -my $dbh; -my $sth; -if ($job_has_uuid) -{ - $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - $User = $arv->{'users'}->{'current'}->execute; - if (!$force_unlock) { - if ($Job->{'is_locked_by'}) { - croak("Job is locked: " . $Job->{'is_locked_by'}); - } - if ($Job->{'success'} ne undef) { - croak("Job 'success' flag (" . $Job->{'success'} . ") is not null"); - } - if ($Job->{'running'}) { - croak("Job 'running' flag is already set"); - } - if ($Job->{'started_at'}) { - croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")"); - } - } -} -else -{ - $Job = JSON::decode_json($jobspec); - - if (!$resume_stash) - { - map { croak ("No $_ specified") unless $Job->{$_} } - qw(script script_version script_parameters); - } - - if (!defined $Job->{'uuid'}) { - chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$)); - } -} -$job_id = $Job->{'uuid'}; - - - -$Job->{'resource_limits'} ||= {}; -$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0; -my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'}; - - -Log (undef, "check slurm allocation"); -my @slot; -my @node; -# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)") -my @sinfo; -if (!$have_slurm) -{ - my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1; - push @sinfo, "$localcpus localhost"; -} -if (exists $ENV{SLURM_NODELIST}) -{ - push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`; -} -foreach (@sinfo) -{ - my ($ncpus, $slurm_nodelist) = split; - $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus; - - my @nodelist; - while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) - { - my $nodelist = $1; - if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/) - { - my $ranges = $1; - foreach (split (",", $ranges)) - { - my ($a, $b); - if (/(\d+)-(\d+)/) - { - $a = $1; - $b = $2; - } - else - { - $a = $_; - $b = $_; - } - push @nodelist, map { - my $n = $nodelist; - $n =~ s/\[[-,\d]+\]/$_/; - $n; - } ($a..$b); - } - } - else - { - push @nodelist, $nodelist; - } - } - foreach my $nodename (@nodelist) - { - Log (undef, "node $nodename - $ncpus slots"); - my $node = { name => $nodename, - ncpus => $ncpus, - losing_streak => 0, - hold_until => 0 }; - foreach my $cpu (1..$ncpus) - { - push @slot, { node => $node, - cpu => $cpu }; - } - } - push @node, @nodelist; -} - - - -# Ensure that we get one jobstep running on each allocated node before -# we start overloading nodes with concurrent steps - -@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; - - - -my $jobmanager_id; -if ($job_has_uuid) -{ - # Claim this job, and make sure nobody else does - - $Job->{'is_locked_by'} = $User->{'uuid'}; - $Job->{'started_at'} = time; - $Job->{'running'} = 1; - $Job->{'success'} = undef; - $Job->{'tasks_summary'} = { 'failed' => 0, - 'todo' => 1, - 'running' => 0, - 'done' => 0 }; - unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) { - croak("Error while updating / locking job"); - } -} - - -Log (undef, "start"); -$SIG{'INT'} = sub { $main::please_freeze = 1; }; -$SIG{'QUIT'} = sub { $main::please_freeze = 1; }; -$SIG{'TERM'} = \&croak; -$SIG{'TSTP'} = sub { $main::please_freeze = 1; }; -$SIG{'ALRM'} = sub { $main::please_info = 1; }; -$SIG{'CONT'} = sub { $main::please_continue = 1; }; -$main::please_freeze = 0; -$main::please_info = 0; -$main::please_continue = 0; -my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key - -grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); -$ENV{"CRUNCH_JOB_UUID"} = $job_id; -$ENV{"JOB_UUID"} = $job_id; - - -my @jobstep; -my @jobstep_todo = (); -my @jobstep_done = (); -my @jobstep_tomerge = (); -my $jobstep_tomerge_level = 0; -my $squeue_checked; -my $squeue_kill_checked; -my $output_in_keep = 0; - - - -if (defined $Job->{thawedfromkey}) -{ - thaw ($Job->{thawedfromkey}); -} -else -{ - my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); - push @jobstep, { 'level' => 0, - 'attempts' => 0, - 'arvados_task' => $first_task, - }; - push @jobstep_todo, 0; -} - - -my $build_script; -do { - local $/ = undef; - $build_script = ; -}; - - -$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision}; - -my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/}); -if ($skip_install) -{ - $ENV{"CRUNCH_SRC"} = $Job->{revision}; -} -else -{ - Log (undef, "Install revision ".$Job->{revision}); - my $nodelist = join(",", @node); - - # Clean out crunch_tmp/work and crunch_tmp/opt - - my $cleanpid = fork(); - if ($cleanpid == 0) - { - srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']); - exit (1); - } - while (1) - { - last if $cleanpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($cleanpid); - select (undef, undef, undef, 0.1); - } - Log (undef, "Clean-work-dir exited $?"); - - # Install requested code version - - my @execargs; - my @srunargs = ("srun", - "--nodelist=$nodelist", - "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); - - $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; - $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; - $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; - - my $commit; - my $treeish = $Job->{'script_version'}; - my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'}; - # Todo: let script_version specify alternate repo - $ENV{"CRUNCH_SRC_URL"} = $repo; - - # Create/update our clone of the remote git repo - - if (!-d $ENV{"CRUNCH_SRC"}) { - system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0 - or croak ("git clone $repo failed: exit ".($?>>8)); - system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false"); - } - `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`; - - # If this looks like a subversion r#, look for it in git-svn commit messages - - if ($treeish =~ m{^\d{1,4}$}) { - my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`; - chomp $gitlog; - if ($gitlog =~ /^[a-f0-9]{40}$/) { - $commit = $gitlog; - Log (undef, "Using commit $commit for revision $treeish"); - } - } - - # If that didn't work, try asking git to look it up as a tree-ish. - - if (!defined $commit) { - - my $cooked_treeish = $treeish; - if ($treeish !~ m{^[0-9a-f]{5,}$}) { - # Looks like a git branch name -- make sure git knows it's - # relative to the remote repo - $cooked_treeish = "origin/$treeish"; - } - - my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`; - chomp $found; - if ($found =~ /^[0-9a-f]{40}$/s) { - $commit = $found; - if ($commit ne $treeish) { - # Make sure we record the real commit id in the database, - # frozentokey, logs, etc. -- instead of an abbreviation or a - # branch name which can become ambiguous or point to a - # different commit in the future. - $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - Log (undef, "Using commit $commit for tree-ish $treeish"); - if ($commit ne $treeish) { - $Job->{'script_version'} = $commit; - $Job->save() or croak("Error while updating job"); - } - } - } - } - - if (defined $commit) { - $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - @execargs = ("sh", "-c", - "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); - } - else { - croak ("could not figure out commit id for $treeish"); - } - - my $installpid = fork(); - if ($installpid == 0) - { - srun (\@srunargs, \@execargs, {}, $build_script); - exit (1); - } - while (1) - { - last if $installpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($installpid); - select (undef, undef, undef, 0.1); - } - Log (undef, "Install exited $?"); -} - - - -foreach (qw (script script_version script_parameters resource_limits)) -{ - Log (undef, $_ . " " . $Job->{$_}); -} -foreach (split (/\n/, $Job->{knobs})) -{ - Log (undef, "knob " . $_); -} - - - -my $success; - - - -ONELEVEL: - -my $thisround_succeeded = 0; -my $thisround_failed = 0; -my $thisround_failed_multiple = 0; - -@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} - or $a <=> $b } @jobstep_todo; -my $level = $jobstep[$jobstep_todo[0]]->{level}; -Log (undef, "start level $level"); - - - -my %proc; -my @freeslot = (0..$#slot); -my @holdslot; -my %reader; -my $progress_is_dirty = 1; -my $progress_stats_updated = 0; - -update_progress_stats(); - - - -THISROUND: -for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) -{ - $main::please_continue = 0; - - my $id = $jobstep_todo[$todo_ptr]; - my $Jobstep = $jobstep[$id]; - if ($Jobstep->{level} != $level) - { - next; - } - if ($Jobstep->{attempts} > 9) - { - Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up"); - $success = 0; - last THISROUND; - } - - pipe $reader{$id}, "writer" or croak ($!); - my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); - fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); - - my $childslot = $freeslot[0]; - my $childnode = $slot[$childslot]->{node}; - my $childslotname = join (".", - $slot[$childslot]->{node}->{name}, - $slot[$childslot]->{cpu}); - my $childpid = fork(); - if ($childpid == 0) - { - $SIG{'INT'} = 'DEFAULT'; - $SIG{'QUIT'} = 'DEFAULT'; - $SIG{'TERM'} = 'DEFAULT'; - - foreach (values (%reader)) - { - close($_); - } - fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec - open(STDOUT,">&writer"); - open(STDERR,">&writer"); - - undef $dbh; - undef $sth; - - delete $ENV{"GNUPGHOME"}; - $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'}; - $ENV{"TASK_QSEQUENCE"} = $id; - $ENV{"TASK_SEQUENCE"} = $level; - $ENV{"JOB_SCRIPT"} = $Job->{script}; - while (my ($param, $value) = each %{$Job->{script_parameters}}) { - $param =~ tr/a-z/A-Z/; - $ENV{"JOB_PARAMETER_$param"} = $value; - } - $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; - $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu}; - $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; - - $ENV{"GZIP"} = "-n"; - - my @srunargs = ( - "srun", - "--nodelist=".$childnode->{name}, - qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, - "--job-name=$job_id.$id.$$", - ); - my @execargs = qw(sh); - my $build_script_to_send = ""; - my $command = - "mkdir -p $ENV{CRUNCH_TMP}/revision " - ."&& cd $ENV{CRUNCH_TMP} "; - if ($build_script) - { - $build_script_to_send = $build_script; - $command .= - "&& perl -"; - } - elsif (!$skip_install) - { - $command .= - "&& " - ."( " - ." [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] " - ."|| " - ." ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' " - ." && ./installrevision " - ." ) " - .") "; - } - $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack - $command .= - "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; - my @execargs = ('bash', '-c', $command); - srun (\@srunargs, \@execargs, undef, $build_script_to_send); - exit (1); - } - close("writer"); - if (!defined $childpid) - { - close $reader{$id}; - delete $reader{$id}; - next; - } - shift @freeslot; - $proc{$childpid} = { jobstep => $id, - time => time, - slot => $childslot, - jobstepname => "$job_id.$id.$childpid", - }; - croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; - $slot[$childslot]->{pid} = $childpid; - - Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'}); - Log ($id, "child $childpid started on $childslotname"); - $Jobstep->{attempts} ++; - $Jobstep->{starttime} = time; - $Jobstep->{node} = $childnode->{name}; - $Jobstep->{slotindex} = $childslot; - delete $Jobstep->{stderr}; - delete $Jobstep->{finishtime}; - - splice @jobstep_todo, $todo_ptr, 1; - --$todo_ptr; - - $progress_is_dirty = 1; - - while (!@freeslot - || - (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) - { - last THISROUND if $main::please_freeze; - if ($main::please_info) - { - $main::please_info = 0; - freeze(); - collate_output(); - save_meta(1); - update_progress_stats(); - } - my $gotsome - = readfrompipes () - + reapchildren (); - if (!$gotsome) - { - check_squeue(); - update_progress_stats(); - select (undef, undef, undef, 0.1); - } - elsif (time - $progress_stats_updated >= 30) - { - update_progress_stats(); - } - if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || - ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) - { - my $message = "Repeated failure rate too high ($thisround_failed_multiple/" - .($thisround_failed+$thisround_succeeded) - .") -- giving up on this round"; - Log (undef, $message); - last THISROUND; - } - - # move slots from freeslot to holdslot (or back to freeslot) if necessary - for (my $i=$#freeslot; $i>=0; $i--) { - if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) { - push @holdslot, (splice @freeslot, $i, 1); - } - } - for (my $i=$#holdslot; $i>=0; $i--) { - if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) { - push @freeslot, (splice @holdslot, $i, 1); - } - } - - # give up if no nodes are succeeding - if (!grep { $_->{node}->{losing_streak} == 0 } @slot) { - my $message = "Every node has failed -- giving up on this round"; - Log (undef, $message); - last THISROUND; - } - } -} - - -push @freeslot, splice @holdslot; -map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot); - - -Log (undef, "wait for last ".(scalar keys %proc)." children to finish"); -while (%proc) -{ - goto THISROUND if $main::please_continue; - $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; - readfrompipes (); - if (!reapchildren()) - { - check_squeue(); - update_progress_stats(); - select (undef, undef, undef, 0.1); - killem (keys %proc) if $main::please_freeze; - } -} - -update_progress_stats(); -freeze_if_want_freeze(); - - -if (!defined $success) -{ - if (@jobstep_todo && - $thisround_succeeded == 0 && - ($thisround_failed == 0 || $thisround_failed > 4)) - { - my $message = "stop because $thisround_failed tasks failed and none succeeded"; - Log (undef, $message); - $success = 0; - } - if (!@jobstep_todo) - { - $success = 1; - } -} - -goto ONELEVEL if !defined $success; - - -release_allocation(); -freeze(); -$Job->{'output'} = &collate_output(); -$Job->{'success'} = $Job->{'output'} && $success; -$Job->save; - -if ($Job->{'output'}) -{ - $arv->{'collections'}->{'create'}->execute('collection' => { - 'uuid' => $Job->{'output'}, - 'manifest_text' => system("whget", $Job->{'output'}), - }); -} - -Log (undef, "finish"); - -save_meta(); -exit 0; - - - -sub update_progress_stats -{ - $progress_stats_updated = time; - return if !$progress_is_dirty; - my ($todo, $done, $running) = (scalar @jobstep_todo, - scalar @jobstep_done, - scalar @slot - scalar @freeslot - scalar @holdslot); - $Job->{'tasks_summary'} ||= {}; - $Job->{'tasks_summary'}->{'todo'} = $todo; - $Job->{'tasks_summary'}->{'done'} = $done; - $Job->{'tasks_summary'}->{'running'} = $running; - $Job->save; - Log (undef, "status: $done done, $running running, $todo todo"); - $progress_is_dirty = 0; -} - - - -sub reapchildren -{ - my $pid = waitpid (-1, WNOHANG); - return 0 if $pid <= 0; - - my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} - . "." - . $slot[$proc{$pid}->{slot}]->{cpu}); - my $jobstepid = $proc{$pid}->{jobstep}; - my $elapsed = time - $proc{$pid}->{time}; - my $Jobstep = $jobstep[$jobstepid]; - - my $exitcode = $?; - my $exitinfo = "exit $exitcode"; - $Jobstep->{'arvados_task'}->reload; - my $success = $Jobstep->{'arvados_task'}->{success}; - - Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success"); - - if (!defined $success) { - # task did not indicate one way or the other --> fail - $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; - $success = 0; - } - - if (!$success) - { - --$Jobstep->{attempts} if $Jobstep->{node_fail}; - ++$thisround_failed; - ++$thisround_failed_multiple if $Jobstep->{attempts} > 1; - - # Check for signs of a failed or misconfigured node - if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= - 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { - # Don't count this against jobstep failure thresholds if this - # node is already suspected faulty and srun exited quickly - if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && - $elapsed < 5 && - $Jobstep->{attempts} > 1) { - Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts"); - --$Jobstep->{attempts}; - } - ban_node_by_slot($proc{$pid}->{slot}); - } - - push @jobstep_todo, $jobstepid; - Log ($jobstepid, "failure in $elapsed seconds"); - $Job->{'tasks_summary'}->{'failed'}++; - } - else - { - ++$thisround_succeeded; - $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; - push @jobstep_done, $jobstepid; - Log ($jobstepid, "success in $elapsed seconds"); - } - $Jobstep->{exitcode} = $exitcode; - $Jobstep->{finishtime} = time; - process_stderr ($jobstepid, $success); - Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); - - close $reader{$jobstepid}; - delete $reader{$jobstepid}; - delete $slot[$proc{$pid}->{slot}]->{pid}; - push @freeslot, $proc{$pid}->{slot}; - delete $proc{$pid}; - - # Load new tasks - my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence' - ); - foreach my $arvados_task (@{$newtask_list->{'items'}}) { - my $jobstep = { - 'level' => $arvados_task->{'sequence'}, - 'attempts' => 0, - 'arvados_task' => $arvados_task - }; - push @jobstep, $jobstep; - push @jobstep_todo, $#jobstep; - } - - $progress_is_dirty = 1; - 1; -} - - -sub check_squeue -{ - # return if the kill list was checked <4 seconds ago - if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) - { - return; - } - $squeue_kill_checked = time; - - # use killem() on procs whose killtime is reached - for (keys %proc) - { - if (exists $proc{$_}->{killtime} - && $proc{$_}->{killtime} <= time) - { - killem ($_); - } - } - - # return if the squeue was checked <60 seconds ago - if (defined $squeue_checked && $squeue_checked > time - 60) - { - return; - } - $squeue_checked = time; - - if (!$have_slurm) - { - # here is an opportunity to check for mysterious problems with local procs - return; - } - - # get a list of steps still running - my @squeue = `squeue -s -h -o '%i %j' && echo ok`; - chop @squeue; - if ($squeue[-1] ne "ok") - { - return; - } - pop @squeue; - - # which of my jobsteps are running, according to squeue? - my %ok; - foreach (@squeue) - { - if (/^(\d+)\.(\d+) (\S+)/) - { - if ($1 eq $ENV{SLURM_JOBID}) - { - $ok{$3} = 1; - } - } - } - - # which of my active child procs (>60s old) were not mentioned by squeue? - foreach (keys %proc) - { - if ($proc{$_}->{time} < time - 60 - && !exists $ok{$proc{$_}->{jobstepname}} - && !exists $proc{$_}->{killtime}) - { - # kill this proc if it hasn't exited in 30 seconds - $proc{$_}->{killtime} = time + 30; - } - } -} - - -sub release_allocation -{ - if ($have_slurm) - { - Log (undef, "release job allocation"); - system "scancel $ENV{SLURM_JOBID}"; - } -} - - -sub readfrompipes -{ - my $gotsome = 0; - foreach my $job (keys %reader) - { - my $buf; - while (0 < sysread ($reader{$job}, $buf, 8192)) - { - print STDERR $buf if $ENV{CRUNCH_DEBUG}; - $jobstep[$job]->{stderr} .= $buf; - preprocess_stderr ($job); - if (length ($jobstep[$job]->{stderr}) > 16384) - { - substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; - } - $gotsome = 1; - } - } - return $gotsome; -} - - -sub preprocess_stderr -{ - my $job = shift; - - while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { - my $line = $1; - substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; - Log ($job, "stderr $line"); - if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) { - # whoa. - $main::please_freeze = 1; - } - elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) { - $jobstep[$job]->{node_fail} = 1; - ban_node_by_slot($jobstep[$job]->{slotindex}); - } - } -} - - -sub process_stderr -{ - my $job = shift; - my $success = shift; - preprocess_stderr ($job); - - map { - Log ($job, "stderr $_"); - } split ("\n", $jobstep[$job]->{stderr}); -} - - -sub collate_output -{ - my $whc = Warehouse->new; - Log (undef, "collate"); - $whc->write_start (1); - my $joboutput; - for (@jobstep) - { - next if (!exists $_->{'arvados_task'}->{output} || - !$_->{'arvados_task'}->{'success'} || - $_->{'exitcode'} != 0); - my $output = $_->{'arvados_task'}->{output}; - if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) - { - $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; - $whc->write_data ($output); - } - elsif (@jobstep == 1) - { - $joboutput = $output; - $whc->write_finish; - } - elsif (defined (my $outblock = $whc->fetch_block ($output))) - { - $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; - $whc->write_data ($outblock); - } - else - { - my $errstr = $whc->errstr; - $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n"); - $success = 0; - } - } - $joboutput = $whc->write_finish if !defined $joboutput; - if ($joboutput) - { - Log (undef, "output $joboutput"); - $Job->{'output'} = $joboutput; - $Job->save; - } - else - { - Log (undef, "output undef"); - } - return $joboutput; -} - - -sub killem -{ - foreach (@_) - { - my $sig = 2; # SIGINT first - if (exists $proc{$_}->{"sent_$sig"} && - time - $proc{$_}->{"sent_$sig"} > 4) - { - $sig = 15; # SIGTERM if SIGINT doesn't work - } - if (exists $proc{$_}->{"sent_$sig"} && - time - $proc{$_}->{"sent_$sig"} > 4) - { - $sig = 9; # SIGKILL if SIGTERM doesn't work - } - if (!exists $proc{$_}->{"sent_$sig"}) - { - Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); - kill $sig, $_; - select (undef, undef, undef, 0.1); - if ($sig == 2) - { - kill $sig, $_; # srun wants two SIGINT to really interrupt - } - $proc{$_}->{"sent_$sig"} = time; - $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"}; - } - } -} - - -sub fhbits -{ - my($bits); - for (@_) { - vec($bits,fileno($_),1) = 1; - } - $bits; -} - - -sub Log # ($jobstep_id, $logmessage) -{ - if ($_[1] =~ /\n/) { - for my $line (split (/\n/, $_[1])) { - Log ($_[0], $line); - } - return; - } - my $fh = select STDERR; $|=1; select $fh; - my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); - $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; - $message .= "\n"; - my $datetime; - if ($metastream || -t STDERR) { - my @gmtime = gmtime; - $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", - $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); - } - print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); - - return if !$metastream; - $metastream->write_data ($datetime . " " . $message); -} - - -sub reconnect_database -{ - return if !$job_has_uuid; - return if ($dbh && $dbh->do ("select now()")); - for (1..16) - { - $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN); - if ($dbh) { - $dbh->{InactiveDestroy} = 1; - return; - } - warn ($DBI::errstr); - sleep $_; - } - croak ($DBI::errstr) if !$dbh; -} - - -sub dbh_do -{ - return 1 if !$job_has_uuid; - my $ret = $dbh->do (@_); - return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/); - reconnect_database(); - return $dbh->do (@_); -} - - -sub croak -{ - my ($package, $file, $line) = caller; - my $message = "@_ at $file line $line\n"; - Log (undef, $message); - freeze() if @jobstep_todo; - collate_output() if @jobstep_todo; - cleanup(); - save_meta() if $metastream; - die; -} - - -sub cleanup -{ - return if !$job_has_uuid; - $Job->reload; - $Job->{'running'} = 0; - $Job->{'success'} = 0; - $Job->{'finished_at'} = time; - $Job->save; -} - - -sub save_meta -{ - my $justcheckpoint = shift; # false if this will be the last meta saved - my $m = $metastream; - $m = $m->copy if $justcheckpoint; - $m->write_finish; - my $loglocator = $m->as_key; - undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it - Log (undef, "meta key is $loglocator"); - $Job->{'log'} = $loglocator; - $Job->save; -} - - -sub freeze_if_want_freeze -{ - if ($main::please_freeze) - { - release_allocation(); - if (@_) - { - # kill some srun procs before freeze+stop - map { $proc{$_} = {} } @_; - while (%proc) - { - killem (keys %proc); - select (undef, undef, undef, 0.1); - my $died; - while (($died = waitpid (-1, WNOHANG)) > 0) - { - delete $proc{$died}; - } - } - } - freeze(); - collate_output(); - cleanup(); - save_meta(); - exit 0; - } -} - - -sub freeze -{ - Log (undef, "Freeze not implemented"); - return; - - my $whc; # todo - Log (undef, "freeze"); - - my $freezer = new Warehouse::Stream (whc => $whc); - $freezer->clear; - $freezer->name ("."); - $freezer->write_start ("state.txt"); - - $freezer->write_data (join ("\n", - "job $Job->{uuid}", - map - { - $_ . "=" . freezequote($Job->{$_}) - } grep { $_ ne "id" } keys %$Job) . "\n\n"); - - foreach my $Jobstep (@jobstep) - { - my $str = join ("\n", - map - { - $_ . "=" . freezequote ($Jobstep->{$_}) - } grep { - $_ !~ /^stderr|slotindex|node_fail/ - } keys %$Jobstep); - $freezer->write_data ($str."\n\n"); - } - if (@jobstep_tomerge) - { - $freezer->write_data - ("merge $jobstep_tomerge_level " - . freezequote (join ("\n", - map { freezequote ($_) } @jobstep_tomerge)) - . "\n\n"); - } - - $freezer->write_finish; - my $frozentokey = $freezer->as_key; - undef $freezer; - Log (undef, "frozento key is $frozentokey"); - dbh_do ("update mrjob set frozentokey=? where id=?", undef, - $frozentokey, $job_id); - my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3); - Log (undef, "frozento+K key is $kfrozentokey"); - return $frozentokey; -} - - -sub thaw -{ - croak ("Thaw not implemented"); - - my $whc; - my $key = shift; - Log (undef, "thaw from $key"); - - @jobstep = (); - @jobstep_done = (); - @jobstep_todo = (); - @jobstep_tomerge = (); - $jobstep_tomerge_level = 0; - my $frozenjob = {}; - - my $stream = new Warehouse::Stream ( whc => $whc, - hash => [split (",", $key)] ); - $stream->rewind; - while (my $dataref = $stream->read_until (undef, "\n\n")) - { - if ($$dataref =~ /^job /) - { - foreach (split ("\n", $$dataref)) - { - my ($k, $v) = split ("=", $_, 2); - $frozenjob->{$k} = freezeunquote ($v); - } - next; - } - - if ($$dataref =~ /^merge (\d+) (.*)/) - { - $jobstep_tomerge_level = $1; - @jobstep_tomerge - = map { freezeunquote ($_) } split ("\n", freezeunquote($2)); - next; - } - - my $Jobstep = { }; - foreach (split ("\n", $$dataref)) - { - my ($k, $v) = split ("=", $_, 2); - $Jobstep->{$k} = freezeunquote ($v) if $k; - } - $Jobstep->{attempts} = 0; - push @jobstep, $Jobstep; - - if ($Jobstep->{exitcode} eq "0") - { - push @jobstep_done, $#jobstep; - } - else - { - push @jobstep_todo, $#jobstep; - } - } - - foreach (qw (script script_version script_parameters)) - { - $Job->{$_} = $frozenjob->{$_}; - } - $Job->save; -} - - -sub freezequote -{ - my $s = shift; - $s =~ s/\\/\\\\/g; - $s =~ s/\n/\\n/g; - return $s; -} - - -sub freezeunquote -{ - my $s = shift; - $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge; - return $s; -} - - -sub srun -{ - my $srunargs = shift; - my $execargs = shift; - my $opts = shift || {}; - my $stdin = shift; - my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; - print STDERR (join (" ", - map { / / ? "'$_'" : $_ } - (@$args)), - "\n") - if $ENV{CRUNCH_DEBUG}; - - if (defined $stdin) { - my $child = open STDIN, "-|"; - defined $child or die "no fork: $!"; - if ($child == 0) { - print $stdin or die $!; - close STDOUT or die $!; - exit 0; - } - } - - return system (@$args) if $opts->{fork}; - - exec @$args; - warn "ENV size is ".length(join(" ",%ENV)); - die "exec failed: $!: @$args"; -} - - -sub ban_node_by_slot { - # Don't start any new jobsteps on this node for 60 seconds - my $slotid = shift; - $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; - Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); -} - -__DATA__ -#!/usr/bin/perl - -# checkout-and-build - -use Fcntl ':flock'; - -my $destdir = $ENV{"CRUNCH_SRC"}; -my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; -my $repo = $ENV{"CRUNCH_SRC_URL"}; - -open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; -flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit) { - exit 0; -} - -open STDOUT, ">", "$destdir.log"; -open STDERR, ">&STDOUT"; - -if (-d "$destdir/.git") { - chdir $destdir or die "chdir $destdir: $!"; - if (0 != system (qw(git remote set-url origin), $repo)) { - # awful... for old versions of git that don't know "remote set-url" - shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config)); - } -} -elsif ($repo && $commit) -{ - shell_or_die('git', 'clone', $repo, $destdir); - chdir $destdir or die "chdir $destdir: $!"; - shell_or_die(qw(git config clean.requireForce false)); -} -else { - die "$destdir does not exist, and no repo/commit specified -- giving up"; -} - -if ($commit) { - unlink "$destdir.commit"; - shell_or_die (qw(git stash)); - shell_or_die (qw(git clean -d -x)); - shell_or_die (qw(git fetch origin)); - shell_or_die (qw(git checkout), $commit); -} - -my $pwd; -chomp ($pwd = `pwd`); -my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; -mkdir $install_dir; -if (!-e "./install.sh" && -e "./tests/autotests.sh") { - # Old version - shell_or_die ("./tests/autotests.sh", $install_dir); -} elsif (-e "./install.sh") { - shell_or_die ("./install.sh", $install_dir); -} - -if ($commit) { - unlink "$destdir.commit.new"; - symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; - rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; -} - -close L; - -exit 0; - -sub shell_or_die -{ - if ($ENV{"DEBUG"}) { - print STDERR "@_\n"; - } - system (@_) == 0 - or die "@_ failed: $! exit 0x".sprintf("%x",$?); -} diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job new file mode 120000 index 0000000000..ff0e7022bf --- /dev/null +++ b/services/crunch/crunch-job @@ -0,0 +1 @@ +../../sdk/cli/bin/arv-crunch-job \ No newline at end of file