Test your function:
<pre>
-arvados=/path/to/arvados
script_name=foo
repo_path=/path/to/repo
-read -rd $'\000' newjob <<EOF; $arvados/services/crunch/crunch-job --job "$newjob"
+read -rd $'\000' newjob <<EOF; arv-crunch-job --job "$newjob"
{
"script":"$script_name",
"script_version":"$repo_path",
EOF
</pre>
-The @--job@ argument to @crunch-job@ is the same as the @--job@ argument to @arv job create@, except:
+The @--job@ argument to @arv-crunch-job@ is the same as the @--job@ argument to @arv job create@, except:
<notextile><!--
* @script_version@ can use "/" to indicate the working copy of a repository instead of a git commit or tag.
s.email = 'gem-dev@clinicalfuture.com'
#s.bindir = '.'
s.licenses = ['Apache License, Version 2.0']
- s.files = ["bin/arv","bin/arv-run-pipeline-instance"]
+ s.files = ["bin/arv","bin/arv-run-pipeline-instance","bin/arv-crunch-job"]
s.executables << "arv"
s.executables << "arv-run-pipeline-instance"
+ s.executables << "arv-crunch-job"
s.add_dependency('google-api-client', '>= 0.6.3')
s.add_dependency('activesupport', '>= 3.2.13')
s.add_dependency('json', '>= 1.7.7')
--- /dev/null
+#!/usr/bin/perl
+# -*- mode: perl; perl-indent-level: 2; -*-
+
+=head1 NAME
+
+crunch-job: Execute job steps, save snapshots as requested, collate output.
+
+=head1 SYNOPSIS
+
+Obtain job details from Arvados, run tasks on compute nodes (typically
+invoked by scheduler on controller):
+
+ crunch-job --job x-y-z
+
+Obtain job details from command line, run tasks on local machine
+(typically invoked by application or developer on VM):
+
+ crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+
+=head1 OPTIONS
+
+=over
+
+=item --force-unlock
+
+If the job is already locked, steal the lock and run it anyway.
+
+=item --git-dir
+
+Path to .git directory where the specified commit is found.
+
+=item --job-api-token
+
+Arvados API authorization token to use during the course of the job.
+
+=back
+
+=head1 RUNNING JOBS LOCALLY
+
+crunch-job's log messages appear on stderr along with the job tasks'
+stderr streams. The log is saved in Keep at each checkpoint and when
+the job finishes.
+
+If the job succeeds, the job's output locator is printed on stdout.
+
+While the job is running, the following signals are accepted:
+
+=over
+
+=item control-C, SIGINT, SIGQUIT
+
+Save a checkpoint, terminate any job tasks that are running, and stop.
+
+=item SIGALRM
+
+Save a checkpoint and continue.
+
+=item SIGHUP
+
+Refresh node allocation (i.e., check whether any nodes have been added
+or unallocated). Currently this is a no-op.
+
+=back
+
+=cut
+
+
+use strict;
+use POSIX ':sys_wait_h';
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Arvados;
+use Getopt::Long;
+use Warehouse;
+use Warehouse::Stream;
+use IPC::System::Simple qw(capturex);
+
+$ENV{"TMPDIR"} ||= "/tmp";
+$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
+if ($ENV{"USER"} ne "crunch" && $< != 0) {
+ # use a tmp dir unique for my uid
+ $ENV{"CRUNCH_TMP"} .= "-$<";
+}
+$ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
+$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
+mkdir ($ENV{"JOB_WORK"});
+
+my $force_unlock;
+my $git_dir;
+my $jobspec;
+my $job_api_token;
+my $resume_stash;
+GetOptions('force-unlock' => \$force_unlock,
+ 'git-dir=s' => \$git_dir,
+ 'job=s' => \$jobspec,
+ 'job-api-token=s' => \$job_api_token,
+ 'resume-stash=s' => \$resume_stash,
+ );
+
+if (defined $job_api_token) {
+ $ENV{ARVADOS_API_TOKEN} = $job_api_token;
+}
+
+my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
+my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
+my $local_job = !$job_has_uuid;
+
+
+$SIG{'HUP'} = sub
+{
+ 1;
+};
+$SIG{'USR1'} = sub
+{
+ $main::ENV{CRUNCH_DEBUG} = 1;
+};
+$SIG{'USR2'} = sub
+{
+ $main::ENV{CRUNCH_DEBUG} = 0;
+};
+
+
+
+my $arv = Arvados->new;
+my $metastream = Warehouse::Stream->new(whc => new Warehouse);
+$metastream->clear;
+$metastream->write_start('log.txt');
+
+my $User = $arv->{'users'}->{'current'}->execute;
+
+my $Job = {};
+my $job_id;
+my $dbh;
+my $sth;
+if ($job_has_uuid)
+{
+ $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ if (!$force_unlock) {
+ if ($Job->{'is_locked_by_uuid'}) {
+ croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
+ }
+ if ($Job->{'success'} ne undef) {
+ croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
+ }
+ if ($Job->{'running'}) {
+ croak("Job 'running' flag is already set");
+ }
+ if ($Job->{'started_at'}) {
+ croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
+ }
+ }
+}
+else
+{
+ $Job = JSON::decode_json($jobspec);
+
+ if (!$resume_stash)
+ {
+ map { croak ("No $_ specified") unless $Job->{$_} }
+ qw(script script_version script_parameters);
+ }
+
+ $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
+ $Job->{'started_at'} = gmtime;
+
+ $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
+
+ $job_has_uuid = 1;
+}
+$job_id = $Job->{'uuid'};
+
+
+
+$Job->{'resource_limits'} ||= {};
+$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
+my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
+
+
+Log (undef, "check slurm allocation");
+my @slot;
+my @node;
+# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
+my @sinfo;
+if (!$have_slurm)
+{
+ my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
+ push @sinfo, "$localcpus localhost";
+}
+if (exists $ENV{SLURM_NODELIST})
+{
+ push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
+}
+foreach (@sinfo)
+{
+ my ($ncpus, $slurm_nodelist) = split;
+ $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
+
+ my @nodelist;
+ while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
+ {
+ my $nodelist = $1;
+ if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
+ {
+ my $ranges = $1;
+ foreach (split (",", $ranges))
+ {
+ my ($a, $b);
+ if (/(\d+)-(\d+)/)
+ {
+ $a = $1;
+ $b = $2;
+ }
+ else
+ {
+ $a = $_;
+ $b = $_;
+ }
+ push @nodelist, map {
+ my $n = $nodelist;
+ $n =~ s/\[[-,\d]+\]/$_/;
+ $n;
+ } ($a..$b);
+ }
+ }
+ else
+ {
+ push @nodelist, $nodelist;
+ }
+ }
+ foreach my $nodename (@nodelist)
+ {
+ Log (undef, "node $nodename - $ncpus slots");
+ my $node = { name => $nodename,
+ ncpus => $ncpus,
+ losing_streak => 0,
+ hold_until => 0 };
+ foreach my $cpu (1..$ncpus)
+ {
+ push @slot, { node => $node,
+ cpu => $cpu };
+ }
+ }
+ push @node, @nodelist;
+}
+
+
+
+# Ensure that we get one jobstep running on each allocated node before
+# we start overloading nodes with concurrent steps
+
+@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
+
+
+
+my $jobmanager_id;
+if ($job_has_uuid)
+{
+ # Claim this job, and make sure nobody else does
+
+ $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
+ $Job->{'started_at'} = gmtime;
+ $Job->{'running'} = 1;
+ $Job->{'success'} = undef;
+ $Job->{'tasks_summary'} = { 'failed' => 0,
+ 'todo' => 1,
+ 'running' => 0,
+ 'done' => 0 };
+ if ($job_has_uuid) {
+ unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
+ croak("Error while updating / locking job");
+ }
+ }
+}
+
+
+Log (undef, "start");
+$SIG{'INT'} = sub { $main::please_freeze = 1; };
+$SIG{'QUIT'} = sub { $main::please_freeze = 1; };
+$SIG{'TERM'} = \&croak;
+$SIG{'TSTP'} = sub { $main::please_freeze = 1; };
+$SIG{'ALRM'} = sub { $main::please_info = 1; };
+$SIG{'CONT'} = sub { $main::please_continue = 1; };
+$main::please_freeze = 0;
+$main::please_info = 0;
+$main::please_continue = 0;
+my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
+
+grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
+$ENV{"CRUNCH_JOB_UUID"} = $job_id;
+$ENV{"JOB_UUID"} = $job_id;
+
+
+my @jobstep;
+my @jobstep_todo = ();
+my @jobstep_done = ();
+my @jobstep_tomerge = ();
+my $jobstep_tomerge_level = 0;
+my $squeue_checked;
+my $squeue_kill_checked;
+my $output_in_keep = 0;
+
+
+
+if (defined $Job->{thawedfromkey})
+{
+ thaw ($Job->{thawedfromkey});
+}
+else
+{
+ my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+ 'job_uuid' => $Job->{'uuid'},
+ 'sequence' => 0,
+ 'qsequence' => 0,
+ 'parameters' => {},
+ });
+ push @jobstep, { 'level' => 0,
+ 'attempts' => 0,
+ 'arvados_task' => $first_task,
+ };
+ push @jobstep_todo, 0;
+}
+
+
+my $build_script;
+
+
+$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
+
+my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
+if ($skip_install)
+{
+ $ENV{"CRUNCH_SRC"} = $Job->{script_version};
+}
+else
+{
+ do {
+ local $/ = undef;
+ $build_script = <DATA>;
+ };
+ Log (undef, "Install revision ".$Job->{script_version});
+ my $nodelist = join(",", @node);
+
+ # Clean out crunch_tmp/work and crunch_tmp/opt
+
+ my $cleanpid = fork();
+ if ($cleanpid == 0)
+ {
+ srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
+ exit (1);
+ }
+ while (1)
+ {
+ last if $cleanpid == waitpid (-1, WNOHANG);
+ freeze_if_want_freeze ($cleanpid);
+ select (undef, undef, undef, 0.1);
+ }
+ Log (undef, "Clean-work-dir exited $?");
+
+ # Install requested code version
+
+ my @execargs;
+ my @srunargs = ("srun",
+ "--nodelist=$nodelist",
+ "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+
+ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
+ $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
+ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
+
+ my $commit;
+ my $git_archive;
+ my $treeish = $Job->{'script_version'};
+ my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
+ # Todo: let script_version specify repository instead of expecting
+ # parent process to figure it out.
+ $ENV{"CRUNCH_SRC_URL"} = $repo;
+
+ # Create/update our clone of the remote git repo
+
+ if (!-d $ENV{"CRUNCH_SRC"}) {
+ system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
+ or croak ("git clone $repo failed: exit ".($?>>8));
+ system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
+ }
+ `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
+
+ # If this looks like a subversion r#, look for it in git-svn commit messages
+
+ if ($treeish =~ m{^\d{1,4}$}) {
+ my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
+ chomp $gitlog;
+ if ($gitlog =~ /^[a-f0-9]{40}$/) {
+ $commit = $gitlog;
+ Log (undef, "Using commit $commit for script_version $treeish");
+ }
+ }
+
+ # If that didn't work, try asking git to look it up as a tree-ish.
+
+ if (!defined $commit) {
+
+ my $cooked_treeish = $treeish;
+ if ($treeish !~ m{^[0-9a-f]{5,}$}) {
+ # Looks like a git branch name -- make sure git knows it's
+ # relative to the remote repo
+ $cooked_treeish = "origin/$treeish";
+ }
+
+ my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
+ chomp $found;
+ if ($found =~ /^[0-9a-f]{40}$/s) {
+ $commit = $found;
+ if ($commit ne $treeish) {
+ # Make sure we record the real commit id in the database,
+ # frozentokey, logs, etc. -- instead of an abbreviation or a
+ # branch name which can become ambiguous or point to a
+ # different commit in the future.
+ $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
+ Log (undef, "Using commit $commit for tree-ish $treeish");
+ if ($commit ne $treeish) {
+ $Job->{'script_version'} = $commit;
+ !$job_has_uuid or $Job->save() or croak("Error while updating job");
+ }
+ }
+ }
+ }
+
+ if (defined $commit) {
+ $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
+ @execargs = ("sh", "-c",
+ "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+ $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
+ }
+ else {
+ croak ("could not figure out commit id for $treeish");
+ }
+
+ my $installpid = fork();
+ if ($installpid == 0)
+ {
+ srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+ exit (1);
+ }
+ while (1)
+ {
+ last if $installpid == waitpid (-1, WNOHANG);
+ freeze_if_want_freeze ($installpid);
+ select (undef, undef, undef, 0.1);
+ }
+ Log (undef, "Install exited $?");
+}
+
+
+
+foreach (qw (script script_version script_parameters resource_limits))
+{
+ Log (undef,
+ "$_ " .
+ (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
+}
+foreach (split (/\n/, $Job->{knobs}))
+{
+ Log (undef, "knob " . $_);
+}
+
+
+
+my $success;
+
+
+
+ONELEVEL:
+
+my $thisround_succeeded = 0;
+my $thisround_failed = 0;
+my $thisround_failed_multiple = 0;
+
+@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
+ or $a <=> $b } @jobstep_todo;
+my $level = $jobstep[$jobstep_todo[0]]->{level};
+Log (undef, "start level $level");
+
+
+
+my %proc;
+my @freeslot = (0..$#slot);
+my @holdslot;
+my %reader;
+my $progress_is_dirty = 1;
+my $progress_stats_updated = 0;
+
+update_progress_stats();
+
+
+
+THISROUND:
+for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
+{
+ $main::please_continue = 0;
+
+ my $id = $jobstep_todo[$todo_ptr];
+ my $Jobstep = $jobstep[$id];
+ if ($Jobstep->{level} != $level)
+ {
+ next;
+ }
+ if ($Jobstep->{attempts} > 9)
+ {
+ Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
+ $success = 0;
+ last THISROUND;
+ }
+
+ pipe $reader{$id}, "writer" or croak ($!);
+ my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
+ fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+
+ my $childslot = $freeslot[0];
+ my $childnode = $slot[$childslot]->{node};
+ my $childslotname = join (".",
+ $slot[$childslot]->{node}->{name},
+ $slot[$childslot]->{cpu});
+ my $childpid = fork();
+ if ($childpid == 0)
+ {
+ $SIG{'INT'} = 'DEFAULT';
+ $SIG{'QUIT'} = 'DEFAULT';
+ $SIG{'TERM'} = 'DEFAULT';
+
+ foreach (values (%reader))
+ {
+ close($_);
+ }
+ fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
+ open(STDOUT,">&writer");
+ open(STDERR,">&writer");
+
+ undef $dbh;
+ undef $sth;
+
+ delete $ENV{"GNUPGHOME"};
+ $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
+ $ENV{"TASK_QSEQUENCE"} = $id;
+ $ENV{"TASK_SEQUENCE"} = $level;
+ $ENV{"JOB_SCRIPT"} = $Job->{script};
+ while (my ($param, $value) = each %{$Job->{script_parameters}}) {
+ $param =~ tr/a-z/A-Z/;
+ $ENV{"JOB_PARAMETER_$param"} = $value;
+ }
+ $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
+ $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
+ $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
+ $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
+ $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+
+ $ENV{"GZIP"} = "-n";
+
+ my @srunargs = (
+ "srun",
+ "--nodelist=".$childnode->{name},
+ qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
+ "--job-name=$job_id.$id.$$",
+ );
+ my @execargs = qw(sh);
+ my $build_script_to_send = "";
+ my $command =
+ "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
+ ."&& cd $ENV{CRUNCH_TMP} ";
+ if ($build_script)
+ {
+ $build_script_to_send = $build_script;
+ $command .=
+ "&& perl -";
+ }
+ $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
+ $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
+ $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
+ $command .=
+ "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+ my @execargs = ('bash', '-c', $command);
+ srun (\@srunargs, \@execargs, undef, $build_script_to_send);
+ exit (1);
+ }
+ close("writer");
+ if (!defined $childpid)
+ {
+ close $reader{$id};
+ delete $reader{$id};
+ next;
+ }
+ shift @freeslot;
+ $proc{$childpid} = { jobstep => $id,
+ time => time,
+ slot => $childslot,
+ jobstepname => "$job_id.$id.$childpid",
+ };
+ croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
+ $slot[$childslot]->{pid} = $childpid;
+
+ Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
+ Log ($id, "child $childpid started on $childslotname");
+ $Jobstep->{attempts} ++;
+ $Jobstep->{starttime} = time;
+ $Jobstep->{node} = $childnode->{name};
+ $Jobstep->{slotindex} = $childslot;
+ delete $Jobstep->{stderr};
+ delete $Jobstep->{finishtime};
+
+ splice @jobstep_todo, $todo_ptr, 1;
+ --$todo_ptr;
+
+ $progress_is_dirty = 1;
+
+ while (!@freeslot
+ ||
+ (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+ {
+ last THISROUND if $main::please_freeze;
+ if ($main::please_info)
+ {
+ $main::please_info = 0;
+ freeze();
+ collate_output();
+ save_meta(1);
+ update_progress_stats();
+ }
+ my $gotsome
+ = readfrompipes ()
+ + reapchildren ();
+ if (!$gotsome)
+ {
+ check_squeue();
+ update_progress_stats();
+ select (undef, undef, undef, 0.1);
+ }
+ elsif (time - $progress_stats_updated >= 30)
+ {
+ update_progress_stats();
+ }
+ if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
+ ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
+ {
+ my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
+ .($thisround_failed+$thisround_succeeded)
+ .") -- giving up on this round";
+ Log (undef, $message);
+ last THISROUND;
+ }
+
+ # move slots from freeslot to holdslot (or back to freeslot) if necessary
+ for (my $i=$#freeslot; $i>=0; $i--) {
+ if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
+ push @holdslot, (splice @freeslot, $i, 1);
+ }
+ }
+ for (my $i=$#holdslot; $i>=0; $i--) {
+ if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
+ push @freeslot, (splice @holdslot, $i, 1);
+ }
+ }
+
+ # give up if no nodes are succeeding
+ if (!grep { $_->{node}->{losing_streak} == 0 &&
+ $_->{node}->{hold_count} < 4 } @slot) {
+ my $message = "Every node has failed -- giving up on this round";
+ Log (undef, $message);
+ last THISROUND;
+ }
+ }
+}
+
+
+push @freeslot, splice @holdslot;
+map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
+
+
+Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
+while (%proc)
+{
+ goto THISROUND if $main::please_continue;
+ $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+ readfrompipes ();
+ if (!reapchildren())
+ {
+ check_squeue();
+ update_progress_stats();
+ select (undef, undef, undef, 0.1);
+ killem (keys %proc) if $main::please_freeze;
+ }
+}
+
+update_progress_stats();
+freeze_if_want_freeze();
+
+
+if (!defined $success)
+{
+ if (@jobstep_todo &&
+ $thisround_succeeded == 0 &&
+ ($thisround_failed == 0 || $thisround_failed > 4))
+ {
+ my $message = "stop because $thisround_failed tasks failed and none succeeded";
+ Log (undef, $message);
+ $success = 0;
+ }
+ if (!@jobstep_todo)
+ {
+ $success = 1;
+ }
+}
+
+goto ONELEVEL if !defined $success;
+
+
+release_allocation();
+freeze();
+$Job->reload;
+$Job->{'output'} = &collate_output();
+$Job->{'running'} = 0;
+$Job->{'success'} = $Job->{'output'} && $success;
+$Job->{'finished_at'} = gmtime;
+$Job->save if $job_has_uuid;
+
+if ($Job->{'output'})
+{
+ eval {
+ my $manifest_text = capturex("whget", $Job->{'output'});
+ $arv->{'collections'}->{'create'}->execute('collection' => {
+ 'uuid' => $Job->{'output'},
+ 'manifest_text' => $manifest_text,
+ });
+ };
+ if ($@) {
+ Log (undef, "Failed to register output manifest: $@");
+ }
+}
+
+Log (undef, "finish");
+
+save_meta();
+exit 0;
+
+
+
+sub update_progress_stats
+{
+ $progress_stats_updated = time;
+ return if !$progress_is_dirty;
+ my ($todo, $done, $running) = (scalar @jobstep_todo,
+ scalar @jobstep_done,
+ scalar @slot - scalar @freeslot - scalar @holdslot);
+ $Job->{'tasks_summary'} ||= {};
+ $Job->{'tasks_summary'}->{'todo'} = $todo;
+ $Job->{'tasks_summary'}->{'done'} = $done;
+ $Job->{'tasks_summary'}->{'running'} = $running;
+ $Job->save if $job_has_uuid;
+ Log (undef, "status: $done done, $running running, $todo todo");
+ $progress_is_dirty = 0;
+}
+
+
+
+sub reapchildren
+{
+ my $pid = waitpid (-1, WNOHANG);
+ return 0 if $pid <= 0;
+
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
+ my $jobstepid = $proc{$pid}->{jobstep};
+ my $elapsed = time - $proc{$pid}->{time};
+ my $Jobstep = $jobstep[$jobstepid];
+
+ my $exitcode = $?;
+ my $exitinfo = "exit $exitcode";
+ $Jobstep->{'arvados_task'}->reload;
+ my $success = $Jobstep->{'arvados_task'}->{success};
+
+ Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
+
+ if (!defined $success) {
+ # task did not indicate one way or the other --> fail
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $success = 0;
+ }
+
+ if (!$success)
+ {
+ my $no_incr_attempts;
+ $no_incr_attempts = 1 if $Jobstep->{node_fail};
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5 &&
+ $Jobstep->{attempts} > 1) {
+ Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
+ $no_incr_attempts = 1;
+ --$Jobstep->{attempts};
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
+ push @jobstep_todo, $jobstepid;
+ Log ($jobstepid, "failure in $elapsed seconds");
+
+ --$Jobstep->{attempts} if $no_incr_attempts;
+ $Job->{'tasks_summary'}->{'failed'}++;
+ }
+ else
+ {
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ push @jobstep_done, $jobstepid;
+ Log ($jobstepid, "success in $elapsed seconds");
+ }
+ $Jobstep->{exitcode} = $exitcode;
+ $Jobstep->{finishtime} = time;
+ process_stderr ($jobstepid, $success);
+ Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+
+ close $reader{$jobstepid};
+ delete $reader{$jobstepid};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
+
+ # Load new tasks
+ my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence'
+ );
+ foreach my $arvados_task (@{$newtask_list->{'items'}}) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'attempts' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+
+ $progress_is_dirty = 1;
+ 1;
+}
+
+
+sub check_squeue
+{
+ # return if the kill list was checked <4 seconds ago
+ if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
+ {
+ return;
+ }
+ $squeue_kill_checked = time;
+
+ # use killem() on procs whose killtime is reached
+ for (keys %proc)
+ {
+ if (exists $proc{$_}->{killtime}
+ && $proc{$_}->{killtime} <= time)
+ {
+ killem ($_);
+ }
+ }
+
+ # return if the squeue was checked <60 seconds ago
+ if (defined $squeue_checked && $squeue_checked > time - 60)
+ {
+ return;
+ }
+ $squeue_checked = time;
+
+ if (!$have_slurm)
+ {
+ # here is an opportunity to check for mysterious problems with local procs
+ return;
+ }
+
+ # get a list of steps still running
+ my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
+ chop @squeue;
+ if ($squeue[-1] ne "ok")
+ {
+ return;
+ }
+ pop @squeue;
+
+ # which of my jobsteps are running, according to squeue?
+ my %ok;
+ foreach (@squeue)
+ {
+ if (/^(\d+)\.(\d+) (\S+)/)
+ {
+ if ($1 eq $ENV{SLURM_JOBID})
+ {
+ $ok{$3} = 1;
+ }
+ }
+ }
+
+ # which of my active child procs (>60s old) were not mentioned by squeue?
+ foreach (keys %proc)
+ {
+ if ($proc{$_}->{time} < time - 60
+ && !exists $ok{$proc{$_}->{jobstepname}}
+ && !exists $proc{$_}->{killtime})
+ {
+ # kill this proc if it hasn't exited in 30 seconds
+ $proc{$_}->{killtime} = time + 30;
+ }
+ }
+}
+
+
+sub release_allocation
+{
+ if ($have_slurm)
+ {
+ Log (undef, "release job allocation");
+ system "scancel $ENV{SLURM_JOBID}";
+ }
+}
+
+
+sub readfrompipes
+{
+ my $gotsome = 0;
+ foreach my $job (keys %reader)
+ {
+ my $buf;
+ while (0 < sysread ($reader{$job}, $buf, 8192))
+ {
+ print STDERR $buf if $ENV{CRUNCH_DEBUG};
+ $jobstep[$job]->{stderr} .= $buf;
+ preprocess_stderr ($job);
+ if (length ($jobstep[$job]->{stderr}) > 16384)
+ {
+ substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ }
+ $gotsome = 1;
+ }
+ }
+ return $gotsome;
+}
+
+
+sub preprocess_stderr
+{
+ my $job = shift;
+
+ while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+ my $line = $1;
+ substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
+ Log ($job, "stderr $line");
+ if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
+ # whoa.
+ $main::please_freeze = 1;
+ }
+ elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+ $jobstep[$job]->{node_fail} = 1;
+ ban_node_by_slot($jobstep[$job]->{slotindex});
+ }
+ }
+}
+
+
+sub process_stderr
+{
+ my $job = shift;
+ my $success = shift;
+ preprocess_stderr ($job);
+
+ map {
+ Log ($job, "stderr $_");
+ } split ("\n", $jobstep[$job]->{stderr});
+}
+
+
+sub collate_output
+{
+ my $whc = Warehouse->new;
+ Log (undef, "collate");
+ $whc->write_start (1);
+ my $joboutput;
+ for (@jobstep)
+ {
+ next if (!exists $_->{'arvados_task'}->{output} ||
+ !$_->{'arvados_task'}->{'success'} ||
+ $_->{'exitcode'} != 0);
+ my $output = $_->{'arvados_task'}->{output};
+ if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
+ {
+ $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
+ $whc->write_data ($output);
+ }
+ elsif (@jobstep == 1)
+ {
+ $joboutput = $output;
+ $whc->write_finish;
+ }
+ elsif (defined (my $outblock = $whc->fetch_block ($output)))
+ {
+ $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
+ $whc->write_data ($outblock);
+ }
+ else
+ {
+ my $errstr = $whc->errstr;
+ $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
+ $success = 0;
+ }
+ }
+ $joboutput = $whc->write_finish if !defined $joboutput;
+ if ($joboutput)
+ {
+ Log (undef, "output $joboutput");
+ $Job->{'output'} = $joboutput;
+ $Job->save if $job_has_uuid;
+ }
+ else
+ {
+ Log (undef, "output undef");
+ }
+ return $joboutput;
+}
+
+
+sub killem
+{
+ foreach (@_)
+ {
+ my $sig = 2; # SIGINT first
+ if (exists $proc{$_}->{"sent_$sig"} &&
+ time - $proc{$_}->{"sent_$sig"} > 4)
+ {
+ $sig = 15; # SIGTERM if SIGINT doesn't work
+ }
+ if (exists $proc{$_}->{"sent_$sig"} &&
+ time - $proc{$_}->{"sent_$sig"} > 4)
+ {
+ $sig = 9; # SIGKILL if SIGTERM doesn't work
+ }
+ if (!exists $proc{$_}->{"sent_$sig"})
+ {
+ Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+ kill $sig, $_;
+ select (undef, undef, undef, 0.1);
+ if ($sig == 2)
+ {
+ kill $sig, $_; # srun wants two SIGINT to really interrupt
+ }
+ $proc{$_}->{"sent_$sig"} = time;
+ $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
+ }
+ }
+}
+
+
+sub fhbits
+{
+ my($bits);
+ for (@_) {
+ vec($bits,fileno($_),1) = 1;
+ }
+ $bits;
+}
+
+
+sub Log # ($jobstep_id, $logmessage)
+{
+ if ($_[1] =~ /\n/) {
+ for my $line (split (/\n/, $_[1])) {
+ Log ($_[0], $line);
+ }
+ return;
+ }
+ my $fh = select STDERR; $|=1; select $fh;
+ my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+ $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
+ $message .= "\n";
+ my $datetime;
+ if ($metastream || -t STDERR) {
+ my @gmtime = gmtime;
+ $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
+ $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
+ }
+ print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
+
+ return if !$metastream;
+ $metastream->write_data ($datetime . " " . $message);
+}
+
+
+sub croak
+{
+ my ($package, $file, $line) = caller;
+ my $message = "@_ at $file line $line\n";
+ Log (undef, $message);
+ freeze() if @jobstep_todo;
+ collate_output() if @jobstep_todo;
+ cleanup();
+ save_meta() if $metastream;
+ die;
+}
+
+
+sub cleanup
+{
+ return if !$job_has_uuid;
+ $Job->reload;
+ $Job->{'running'} = 0;
+ $Job->{'success'} = 0;
+ $Job->{'finished_at'} = gmtime;
+ $Job->save;
+}
+
+
+sub save_meta
+{
+ my $justcheckpoint = shift; # false if this will be the last meta saved
+ my $m = $metastream;
+ $m = $m->copy if $justcheckpoint;
+ $m->write_finish;
+ my $loglocator = $m->as_key;
+ undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
+ Log (undef, "meta key is $loglocator");
+ $Job->{'log'} = $loglocator;
+ $Job->save if $job_has_uuid;
+}
+
+
+sub freeze_if_want_freeze
+{
+ if ($main::please_freeze)
+ {
+ release_allocation();
+ if (@_)
+ {
+ # kill some srun procs before freeze+stop
+ map { $proc{$_} = {} } @_;
+ while (%proc)
+ {
+ killem (keys %proc);
+ select (undef, undef, undef, 0.1);
+ my $died;
+ while (($died = waitpid (-1, WNOHANG)) > 0)
+ {
+ delete $proc{$died};
+ }
+ }
+ }
+ freeze();
+ collate_output();
+ cleanup();
+ save_meta();
+ exit 0;
+ }
+}
+
+
+sub freeze
+{
+ Log (undef, "Freeze not implemented");
+ return;
+}
+
+
+sub thaw
+{
+ croak ("Thaw not implemented");
+
+ my $whc;
+ my $key = shift;
+ Log (undef, "thaw from $key");
+
+ @jobstep = ();
+ @jobstep_done = ();
+ @jobstep_todo = ();
+ @jobstep_tomerge = ();
+ $jobstep_tomerge_level = 0;
+ my $frozenjob = {};
+
+ my $stream = new Warehouse::Stream ( whc => $whc,
+ hash => [split (",", $key)] );
+ $stream->rewind;
+ while (my $dataref = $stream->read_until (undef, "\n\n"))
+ {
+ if ($$dataref =~ /^job /)
+ {
+ foreach (split ("\n", $$dataref))
+ {
+ my ($k, $v) = split ("=", $_, 2);
+ $frozenjob->{$k} = freezeunquote ($v);
+ }
+ next;
+ }
+
+ if ($$dataref =~ /^merge (\d+) (.*)/)
+ {
+ $jobstep_tomerge_level = $1;
+ @jobstep_tomerge
+ = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
+ next;
+ }
+
+ my $Jobstep = { };
+ foreach (split ("\n", $$dataref))
+ {
+ my ($k, $v) = split ("=", $_, 2);
+ $Jobstep->{$k} = freezeunquote ($v) if $k;
+ }
+ $Jobstep->{attempts} = 0;
+ push @jobstep, $Jobstep;
+
+ if ($Jobstep->{exitcode} eq "0")
+ {
+ push @jobstep_done, $#jobstep;
+ }
+ else
+ {
+ push @jobstep_todo, $#jobstep;
+ }
+ }
+
+ foreach (qw (script script_version script_parameters))
+ {
+ $Job->{$_} = $frozenjob->{$_};
+ }
+ $Job->save if $job_has_uuid;
+}
+
+
+sub freezequote
+{
+ my $s = shift;
+ $s =~ s/\\/\\\\/g;
+ $s =~ s/\n/\\n/g;
+ return $s;
+}
+
+
+sub freezeunquote
+{
+ my $s = shift;
+ $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
+ return $s;
+}
+
+
+sub srun
+{
+ my $srunargs = shift;
+ my $execargs = shift;
+ my $opts = shift || {};
+ my $stdin = shift;
+ my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
+ print STDERR (join (" ",
+ map { / / ? "'$_'" : $_ }
+ (@$args)),
+ "\n")
+ if $ENV{CRUNCH_DEBUG};
+
+ if (defined $stdin) {
+ my $child = open STDIN, "-|";
+ defined $child or die "no fork: $!";
+ if ($child == 0) {
+ print $stdin or die $!;
+ close STDOUT or die $!;
+ exit 0;
+ }
+ }
+
+ return system (@$args) if $opts->{fork};
+
+ exec @$args;
+ warn "ENV size is ".length(join(" ",%ENV));
+ die "exec failed: $!: @$args";
+}
+
+
+sub ban_node_by_slot {
+ # Don't start any new jobsteps on this node for 60 seconds
+ my $slotid = shift;
+ $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
+ $slot[$slotid]->{node}->{hold_count}++;
+ Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
+}
+
+__DATA__
+#!/usr/bin/perl
+
+# checkout-and-build
+
+use Fcntl ':flock';
+
+my $destdir = $ENV{"CRUNCH_SRC"};
+my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $repo = $ENV{"CRUNCH_SRC_URL"};
+
+open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
+flock L, LOCK_EX;
+if (readlink ("$destdir.commit") eq $commit) {
+ exit 0;
+}
+
+unlink "$destdir.commit";
+open STDOUT, ">", "$destdir.log";
+open STDERR, ">&STDOUT";
+
+mkdir $destdir;
+open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
+print TARX <DATA>;
+if(!close(TARX)) {
+ die "'tar -C $destdir -xf -' exited $?: $!";
+}
+
+my $pwd;
+chomp ($pwd = `pwd`);
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
+mkdir $install_dir;
+if (-e "$destdir/crunch_scripts/install") {
+ shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
+ # Old version
+ shell_or_die ("./tests/autotests.sh", $install_dir);
+} elsif (-e "./install.sh") {
+ shell_or_die ("./install.sh", $install_dir);
+}
+
+if ($commit) {
+ unlink "$destdir.commit.new";
+ symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
+ rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+}
+
+close L;
+
+exit 0;
+
+sub shell_or_die
+{
+ if ($ENV{"DEBUG"}) {
+ print STDERR "@_\n";
+ }
+ system (@_) == 0
+ or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+}
+
+__DATA__
+++ /dev/null
-#!/usr/bin/perl
-# -*- mode: perl; perl-indent-level: 2; -*-
-
-=head1 NAME
-
-crunch-job: Execute job steps, save snapshots as requested, collate output.
-
-=head1 SYNOPSIS
-
-Obtain job details from Arvados, run tasks on compute nodes (typically
-invoked by scheduler on controller):
-
- crunch-job --job x-y-z
-
-Obtain job details from command line, run tasks on local machine
-(typically invoked by application or developer on VM):
-
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
-
-=head1 OPTIONS
-
-=over
-
-=item --force-unlock
-
-If the job is already locked, steal the lock and run it anyway.
-
-=item --git-dir
-
-Path to .git directory where the specified commit is found.
-
-=item --job-api-token
-
-Arvados API authorization token to use during the course of the job.
-
-=back
-
-=head1 RUNNING JOBS LOCALLY
-
-crunch-job's log messages appear on stderr along with the job tasks'
-stderr streams. The log is saved in Keep at each checkpoint and when
-the job finishes.
-
-If the job succeeds, the job's output locator is printed on stdout.
-
-While the job is running, the following signals are accepted:
-
-=over
-
-=item control-C, SIGINT, SIGQUIT
-
-Save a checkpoint, terminate any job tasks that are running, and stop.
-
-=item SIGALRM
-
-Save a checkpoint and continue.
-
-=item SIGHUP
-
-Refresh node allocation (i.e., check whether any nodes have been added
-or unallocated). Currently this is a no-op.
-
-=back
-
-=cut
-
-
-use strict;
-use POSIX ':sys_wait_h';
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Arvados;
-use Getopt::Long;
-use Warehouse;
-use Warehouse::Stream;
-use IPC::System::Simple qw(capturex);
-
-$ENV{"TMPDIR"} ||= "/tmp";
-$ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
-if ($ENV{"USER"} ne "crunch" && $< != 0) {
- # use a tmp dir unique for my uid
- $ENV{"CRUNCH_TMP"} .= "-$<";
-}
-$ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
-$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
-mkdir ($ENV{"JOB_WORK"});
-
-my $force_unlock;
-my $git_dir;
-my $jobspec;
-my $job_api_token;
-my $resume_stash;
-GetOptions('force-unlock' => \$force_unlock,
- 'git-dir=s' => \$git_dir,
- 'job=s' => \$jobspec,
- 'job-api-token=s' => \$job_api_token,
- 'resume-stash=s' => \$resume_stash,
- );
-
-if (defined $job_api_token) {
- $ENV{ARVADOS_API_TOKEN} = $job_api_token;
-}
-
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
-my $local_job = !$job_has_uuid;
-
-
-$SIG{'HUP'} = sub
-{
- 1;
-};
-$SIG{'USR1'} = sub
-{
- $main::ENV{CRUNCH_DEBUG} = 1;
-};
-$SIG{'USR2'} = sub
-{
- $main::ENV{CRUNCH_DEBUG} = 0;
-};
-
-
-
-my $arv = Arvados->new;
-my $metastream = Warehouse::Stream->new(whc => new Warehouse);
-$metastream->clear;
-$metastream->write_start('log.txt');
-
-my $User = $arv->{'users'}->{'current'}->execute;
-
-my $Job = {};
-my $job_id;
-my $dbh;
-my $sth;
-if ($job_has_uuid)
-{
- $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
- if (!$force_unlock) {
- if ($Job->{'is_locked_by_uuid'}) {
- croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
- }
- if ($Job->{'success'} ne undef) {
- croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
- }
- if ($Job->{'running'}) {
- croak("Job 'running' flag is already set");
- }
- if ($Job->{'started_at'}) {
- croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
- }
- }
-}
-else
-{
- $Job = JSON::decode_json($jobspec);
-
- if (!$resume_stash)
- {
- map { croak ("No $_ specified") unless $Job->{$_} }
- qw(script script_version script_parameters);
- }
-
- $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
- $Job->{'started_at'} = gmtime;
-
- $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
-
- $job_has_uuid = 1;
-}
-$job_id = $Job->{'uuid'};
-
-
-
-$Job->{'resource_limits'} ||= {};
-$Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
-my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
-
-
-Log (undef, "check slurm allocation");
-my @slot;
-my @node;
-# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
-my @sinfo;
-if (!$have_slurm)
-{
- my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
- push @sinfo, "$localcpus localhost";
-}
-if (exists $ENV{SLURM_NODELIST})
-{
- push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
-}
-foreach (@sinfo)
-{
- my ($ncpus, $slurm_nodelist) = split;
- $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
-
- my @nodelist;
- while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
- {
- my $nodelist = $1;
- if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
- {
- my $ranges = $1;
- foreach (split (",", $ranges))
- {
- my ($a, $b);
- if (/(\d+)-(\d+)/)
- {
- $a = $1;
- $b = $2;
- }
- else
- {
- $a = $_;
- $b = $_;
- }
- push @nodelist, map {
- my $n = $nodelist;
- $n =~ s/\[[-,\d]+\]/$_/;
- $n;
- } ($a..$b);
- }
- }
- else
- {
- push @nodelist, $nodelist;
- }
- }
- foreach my $nodename (@nodelist)
- {
- Log (undef, "node $nodename - $ncpus slots");
- my $node = { name => $nodename,
- ncpus => $ncpus,
- losing_streak => 0,
- hold_until => 0 };
- foreach my $cpu (1..$ncpus)
- {
- push @slot, { node => $node,
- cpu => $cpu };
- }
- }
- push @node, @nodelist;
-}
-
-
-
-# Ensure that we get one jobstep running on each allocated node before
-# we start overloading nodes with concurrent steps
-
-@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
-
-
-
-my $jobmanager_id;
-if ($job_has_uuid)
-{
- # Claim this job, and make sure nobody else does
-
- $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
- $Job->{'started_at'} = gmtime;
- $Job->{'running'} = 1;
- $Job->{'success'} = undef;
- $Job->{'tasks_summary'} = { 'failed' => 0,
- 'todo' => 1,
- 'running' => 0,
- 'done' => 0 };
- if ($job_has_uuid) {
- unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
- croak("Error while updating / locking job");
- }
- }
-}
-
-
-Log (undef, "start");
-$SIG{'INT'} = sub { $main::please_freeze = 1; };
-$SIG{'QUIT'} = sub { $main::please_freeze = 1; };
-$SIG{'TERM'} = \&croak;
-$SIG{'TSTP'} = sub { $main::please_freeze = 1; };
-$SIG{'ALRM'} = sub { $main::please_info = 1; };
-$SIG{'CONT'} = sub { $main::please_continue = 1; };
-$main::please_freeze = 0;
-$main::please_info = 0;
-$main::please_continue = 0;
-my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
-
-grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
-$ENV{"CRUNCH_JOB_UUID"} = $job_id;
-$ENV{"JOB_UUID"} = $job_id;
-
-
-my @jobstep;
-my @jobstep_todo = ();
-my @jobstep_done = ();
-my @jobstep_tomerge = ();
-my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
-my $output_in_keep = 0;
-
-
-
-if (defined $Job->{thawedfromkey})
-{
- thaw ($Job->{thawedfromkey});
-}
-else
-{
- my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
- 'job_uuid' => $Job->{'uuid'},
- 'sequence' => 0,
- 'qsequence' => 0,
- 'parameters' => {},
- });
- push @jobstep, { 'level' => 0,
- 'attempts' => 0,
- 'arvados_task' => $first_task,
- };
- push @jobstep_todo, 0;
-}
-
-
-my $build_script;
-
-
-$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-
-my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
-if ($skip_install)
-{
- $ENV{"CRUNCH_SRC"} = $Job->{script_version};
-}
-else
-{
- do {
- local $/ = undef;
- $build_script = <DATA>;
- };
- Log (undef, "Install revision ".$Job->{script_version});
- my $nodelist = join(",", @node);
-
- # Clean out crunch_tmp/work and crunch_tmp/opt
-
- my $cleanpid = fork();
- if ($cleanpid == 0)
- {
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
- exit (1);
- }
- while (1)
- {
- last if $cleanpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($cleanpid);
- select (undef, undef, undef, 0.1);
- }
- Log (undef, "Clean-work-dir exited $?");
-
- # Install requested code version
-
- my @execargs;
- my @srunargs = ("srun",
- "--nodelist=$nodelist",
- "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-
- $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
- $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
- $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
-
- my $commit;
- my $git_archive;
- my $treeish = $Job->{'script_version'};
- my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
- # Todo: let script_version specify repository instead of expecting
- # parent process to figure it out.
- $ENV{"CRUNCH_SRC_URL"} = $repo;
-
- # Create/update our clone of the remote git repo
-
- if (!-d $ENV{"CRUNCH_SRC"}) {
- system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
- or croak ("git clone $repo failed: exit ".($?>>8));
- system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
- }
- `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
-
- # If this looks like a subversion r#, look for it in git-svn commit messages
-
- if ($treeish =~ m{^\d{1,4}$}) {
- my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
- chomp $gitlog;
- if ($gitlog =~ /^[a-f0-9]{40}$/) {
- $commit = $gitlog;
- Log (undef, "Using commit $commit for script_version $treeish");
- }
- }
-
- # If that didn't work, try asking git to look it up as a tree-ish.
-
- if (!defined $commit) {
-
- my $cooked_treeish = $treeish;
- if ($treeish !~ m{^[0-9a-f]{5,}$}) {
- # Looks like a git branch name -- make sure git knows it's
- # relative to the remote repo
- $cooked_treeish = "origin/$treeish";
- }
-
- my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
- chomp $found;
- if ($found =~ /^[0-9a-f]{40}$/s) {
- $commit = $found;
- if ($commit ne $treeish) {
- # Make sure we record the real commit id in the database,
- # frozentokey, logs, etc. -- instead of an abbreviation or a
- # branch name which can become ambiguous or point to a
- # different commit in the future.
- $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
- Log (undef, "Using commit $commit for tree-ish $treeish");
- if ($commit ne $treeish) {
- $Job->{'script_version'} = $commit;
- !$job_has_uuid or $Job->save() or croak("Error while updating job");
- }
- }
- }
- }
-
- if (defined $commit) {
- $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
- @execargs = ("sh", "-c",
- "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
- $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
- }
- else {
- croak ("could not figure out commit id for $treeish");
- }
-
- my $installpid = fork();
- if ($installpid == 0)
- {
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- while (1)
- {
- last if $installpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($installpid);
- select (undef, undef, undef, 0.1);
- }
- Log (undef, "Install exited $?");
-}
-
-
-
-foreach (qw (script script_version script_parameters resource_limits))
-{
- Log (undef,
- "$_ " .
- (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
-}
-foreach (split (/\n/, $Job->{knobs}))
-{
- Log (undef, "knob " . $_);
-}
-
-
-
-my $success;
-
-
-
-ONELEVEL:
-
-my $thisround_succeeded = 0;
-my $thisround_failed = 0;
-my $thisround_failed_multiple = 0;
-
-@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
- or $a <=> $b } @jobstep_todo;
-my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
-
-
-
-my %proc;
-my @freeslot = (0..$#slot);
-my @holdslot;
-my %reader;
-my $progress_is_dirty = 1;
-my $progress_stats_updated = 0;
-
-update_progress_stats();
-
-
-
-THISROUND:
-for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
-{
- $main::please_continue = 0;
-
- my $id = $jobstep_todo[$todo_ptr];
- my $Jobstep = $jobstep[$id];
- if ($Jobstep->{level} != $level)
- {
- next;
- }
- if ($Jobstep->{attempts} > 9)
- {
- Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
- $success = 0;
- last THISROUND;
- }
-
- pipe $reader{$id}, "writer" or croak ($!);
- my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
- fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
-
- my $childslot = $freeslot[0];
- my $childnode = $slot[$childslot]->{node};
- my $childslotname = join (".",
- $slot[$childslot]->{node}->{name},
- $slot[$childslot]->{cpu});
- my $childpid = fork();
- if ($childpid == 0)
- {
- $SIG{'INT'} = 'DEFAULT';
- $SIG{'QUIT'} = 'DEFAULT';
- $SIG{'TERM'} = 'DEFAULT';
-
- foreach (values (%reader))
- {
- close($_);
- }
- fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
- open(STDOUT,">&writer");
- open(STDERR,">&writer");
-
- undef $dbh;
- undef $sth;
-
- delete $ENV{"GNUPGHOME"};
- $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
- $ENV{"TASK_QSEQUENCE"} = $id;
- $ENV{"TASK_SEQUENCE"} = $level;
- $ENV{"JOB_SCRIPT"} = $Job->{script};
- while (my ($param, $value) = each %{$Job->{script_parameters}}) {
- $param =~ tr/a-z/A-Z/;
- $ENV{"JOB_PARAMETER_$param"} = $value;
- }
- $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
- $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
- $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
- $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
- $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
-
- $ENV{"GZIP"} = "-n";
-
- my @srunargs = (
- "srun",
- "--nodelist=".$childnode->{name},
- qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
- "--job-name=$job_id.$id.$$",
- );
- my @execargs = qw(sh);
- my $build_script_to_send = "";
- my $command =
- "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
- ."&& cd $ENV{CRUNCH_TMP} ";
- if ($build_script)
- {
- $build_script_to_send = $build_script;
- $command .=
- "&& perl -";
- }
- $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
- $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
- $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
- $command .=
- "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
- my @execargs = ('bash', '-c', $command);
- srun (\@srunargs, \@execargs, undef, $build_script_to_send);
- exit (1);
- }
- close("writer");
- if (!defined $childpid)
- {
- close $reader{$id};
- delete $reader{$id};
- next;
- }
- shift @freeslot;
- $proc{$childpid} = { jobstep => $id,
- time => time,
- slot => $childslot,
- jobstepname => "$job_id.$id.$childpid",
- };
- croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
- $slot[$childslot]->{pid} = $childpid;
-
- Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
- Log ($id, "child $childpid started on $childslotname");
- $Jobstep->{attempts} ++;
- $Jobstep->{starttime} = time;
- $Jobstep->{node} = $childnode->{name};
- $Jobstep->{slotindex} = $childslot;
- delete $Jobstep->{stderr};
- delete $Jobstep->{finishtime};
-
- splice @jobstep_todo, $todo_ptr, 1;
- --$todo_ptr;
-
- $progress_is_dirty = 1;
-
- while (!@freeslot
- ||
- (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
- {
- last THISROUND if $main::please_freeze;
- if ($main::please_info)
- {
- $main::please_info = 0;
- freeze();
- collate_output();
- save_meta(1);
- update_progress_stats();
- }
- my $gotsome
- = readfrompipes ()
- + reapchildren ();
- if (!$gotsome)
- {
- check_squeue();
- update_progress_stats();
- select (undef, undef, undef, 0.1);
- }
- elsif (time - $progress_stats_updated >= 30)
- {
- update_progress_stats();
- }
- if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
- ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
- {
- my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
- .($thisround_failed+$thisround_succeeded)
- .") -- giving up on this round";
- Log (undef, $message);
- last THISROUND;
- }
-
- # move slots from freeslot to holdslot (or back to freeslot) if necessary
- for (my $i=$#freeslot; $i>=0; $i--) {
- if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
- push @holdslot, (splice @freeslot, $i, 1);
- }
- }
- for (my $i=$#holdslot; $i>=0; $i--) {
- if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
- push @freeslot, (splice @holdslot, $i, 1);
- }
- }
-
- # give up if no nodes are succeeding
- if (!grep { $_->{node}->{losing_streak} == 0 &&
- $_->{node}->{hold_count} < 4 } @slot) {
- my $message = "Every node has failed -- giving up on this round";
- Log (undef, $message);
- last THISROUND;
- }
- }
-}
-
-
-push @freeslot, splice @holdslot;
-map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
-
-
-Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
-while (%proc)
-{
- goto THISROUND if $main::please_continue;
- $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
- readfrompipes ();
- if (!reapchildren())
- {
- check_squeue();
- update_progress_stats();
- select (undef, undef, undef, 0.1);
- killem (keys %proc) if $main::please_freeze;
- }
-}
-
-update_progress_stats();
-freeze_if_want_freeze();
-
-
-if (!defined $success)
-{
- if (@jobstep_todo &&
- $thisround_succeeded == 0 &&
- ($thisround_failed == 0 || $thisround_failed > 4))
- {
- my $message = "stop because $thisround_failed tasks failed and none succeeded";
- Log (undef, $message);
- $success = 0;
- }
- if (!@jobstep_todo)
- {
- $success = 1;
- }
-}
-
-goto ONELEVEL if !defined $success;
-
-
-release_allocation();
-freeze();
-$Job->reload;
-$Job->{'output'} = &collate_output();
-$Job->{'running'} = 0;
-$Job->{'success'} = $Job->{'output'} && $success;
-$Job->{'finished_at'} = gmtime;
-$Job->save if $job_has_uuid;
-
-if ($Job->{'output'})
-{
- eval {
- my $manifest_text = capturex("whget", $Job->{'output'});
- $arv->{'collections'}->{'create'}->execute('collection' => {
- 'uuid' => $Job->{'output'},
- 'manifest_text' => $manifest_text,
- });
- };
- if ($@) {
- Log (undef, "Failed to register output manifest: $@");
- }
-}
-
-Log (undef, "finish");
-
-save_meta();
-exit 0;
-
-
-
-sub update_progress_stats
-{
- $progress_stats_updated = time;
- return if !$progress_is_dirty;
- my ($todo, $done, $running) = (scalar @jobstep_todo,
- scalar @jobstep_done,
- scalar @slot - scalar @freeslot - scalar @holdslot);
- $Job->{'tasks_summary'} ||= {};
- $Job->{'tasks_summary'}->{'todo'} = $todo;
- $Job->{'tasks_summary'}->{'done'} = $done;
- $Job->{'tasks_summary'}->{'running'} = $running;
- $Job->save if $job_has_uuid;
- Log (undef, "status: $done done, $running running, $todo todo");
- $progress_is_dirty = 0;
-}
-
-
-
-sub reapchildren
-{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
-
- my $exitcode = $?;
- my $exitinfo = "exit $exitcode";
- $Jobstep->{'arvados_task'}->reload;
- my $success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
-
- if (!defined $success) {
- # task did not indicate one way or the other --> fail
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $success = 0;
- }
-
- if (!$success)
- {
- my $no_incr_attempts;
- $no_incr_attempts = 1 if $Jobstep->{node_fail};
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5 &&
- $Jobstep->{attempts} > 1) {
- Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
- $no_incr_attempts = 1;
- --$Jobstep->{attempts};
- }
- ban_node_by_slot($proc{$pid}->{slot});
- }
-
- push @jobstep_todo, $jobstepid;
- Log ($jobstepid, "failure in $elapsed seconds");
-
- --$Jobstep->{attempts} if $no_incr_attempts;
- $Job->{'tasks_summary'}->{'failed'}++;
- }
- else
- {
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $exitcode;
- $Jobstep->{finishtime} = time;
- process_stderr ($jobstepid, $success);
- Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
-
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
-
- # Load new tasks
- my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence'
- );
- foreach my $arvados_task (@{$newtask_list->{'items'}}) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'attempts' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
- }
-
- $progress_is_dirty = 1;
- 1;
-}
-
-
-sub check_squeue
-{
- # return if the kill list was checked <4 seconds ago
- if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
- {
- return;
- }
- $squeue_kill_checked = time;
-
- # use killem() on procs whose killtime is reached
- for (keys %proc)
- {
- if (exists $proc{$_}->{killtime}
- && $proc{$_}->{killtime} <= time)
- {
- killem ($_);
- }
- }
-
- # return if the squeue was checked <60 seconds ago
- if (defined $squeue_checked && $squeue_checked > time - 60)
- {
- return;
- }
- $squeue_checked = time;
-
- if (!$have_slurm)
- {
- # here is an opportunity to check for mysterious problems with local procs
- return;
- }
-
- # get a list of steps still running
- my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
- chop @squeue;
- if ($squeue[-1] ne "ok")
- {
- return;
- }
- pop @squeue;
-
- # which of my jobsteps are running, according to squeue?
- my %ok;
- foreach (@squeue)
- {
- if (/^(\d+)\.(\d+) (\S+)/)
- {
- if ($1 eq $ENV{SLURM_JOBID})
- {
- $ok{$3} = 1;
- }
- }
- }
-
- # which of my active child procs (>60s old) were not mentioned by squeue?
- foreach (keys %proc)
- {
- if ($proc{$_}->{time} < time - 60
- && !exists $ok{$proc{$_}->{jobstepname}}
- && !exists $proc{$_}->{killtime})
- {
- # kill this proc if it hasn't exited in 30 seconds
- $proc{$_}->{killtime} = time + 30;
- }
- }
-}
-
-
-sub release_allocation
-{
- if ($have_slurm)
- {
- Log (undef, "release job allocation");
- system "scancel $ENV{SLURM_JOBID}";
- }
-}
-
-
-sub readfrompipes
-{
- my $gotsome = 0;
- foreach my $job (keys %reader)
- {
- my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
- {
- print STDERR $buf if $ENV{CRUNCH_DEBUG};
- $jobstep[$job]->{stderr} .= $buf;
- preprocess_stderr ($job);
- if (length ($jobstep[$job]->{stderr}) > 16384)
- {
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
- }
- $gotsome = 1;
- }
- }
- return $gotsome;
-}
-
-
-sub preprocess_stderr
-{
- my $job = shift;
-
- while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
- my $line = $1;
- substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
- Log ($job, "stderr $line");
- if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
- # whoa.
- $main::please_freeze = 1;
- }
- elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
- $jobstep[$job]->{node_fail} = 1;
- ban_node_by_slot($jobstep[$job]->{slotindex});
- }
- }
-}
-
-
-sub process_stderr
-{
- my $job = shift;
- my $success = shift;
- preprocess_stderr ($job);
-
- map {
- Log ($job, "stderr $_");
- } split ("\n", $jobstep[$job]->{stderr});
-}
-
-
-sub collate_output
-{
- my $whc = Warehouse->new;
- Log (undef, "collate");
- $whc->write_start (1);
- my $joboutput;
- for (@jobstep)
- {
- next if (!exists $_->{'arvados_task'}->{output} ||
- !$_->{'arvados_task'}->{'success'} ||
- $_->{'exitcode'} != 0);
- my $output = $_->{'arvados_task'}->{output};
- if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
- {
- $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
- $whc->write_data ($output);
- }
- elsif (@jobstep == 1)
- {
- $joboutput = $output;
- $whc->write_finish;
- }
- elsif (defined (my $outblock = $whc->fetch_block ($output)))
- {
- $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
- $whc->write_data ($outblock);
- }
- else
- {
- my $errstr = $whc->errstr;
- $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
- $success = 0;
- }
- }
- $joboutput = $whc->write_finish if !defined $joboutput;
- if ($joboutput)
- {
- Log (undef, "output $joboutput");
- $Job->{'output'} = $joboutput;
- $Job->save if $job_has_uuid;
- }
- else
- {
- Log (undef, "output undef");
- }
- return $joboutput;
-}
-
-
-sub killem
-{
- foreach (@_)
- {
- my $sig = 2; # SIGINT first
- if (exists $proc{$_}->{"sent_$sig"} &&
- time - $proc{$_}->{"sent_$sig"} > 4)
- {
- $sig = 15; # SIGTERM if SIGINT doesn't work
- }
- if (exists $proc{$_}->{"sent_$sig"} &&
- time - $proc{$_}->{"sent_$sig"} > 4)
- {
- $sig = 9; # SIGKILL if SIGTERM doesn't work
- }
- if (!exists $proc{$_}->{"sent_$sig"})
- {
- Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
- kill $sig, $_;
- select (undef, undef, undef, 0.1);
- if ($sig == 2)
- {
- kill $sig, $_; # srun wants two SIGINT to really interrupt
- }
- $proc{$_}->{"sent_$sig"} = time;
- $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
- }
- }
-}
-
-
-sub fhbits
-{
- my($bits);
- for (@_) {
- vec($bits,fileno($_),1) = 1;
- }
- $bits;
-}
-
-
-sub Log # ($jobstep_id, $logmessage)
-{
- if ($_[1] =~ /\n/) {
- for my $line (split (/\n/, $_[1])) {
- Log ($_[0], $line);
- }
- return;
- }
- my $fh = select STDERR; $|=1; select $fh;
- my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
- $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
- $message .= "\n";
- my $datetime;
- if ($metastream || -t STDERR) {
- my @gmtime = gmtime;
- $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
- $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
- }
- print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
-
- return if !$metastream;
- $metastream->write_data ($datetime . " " . $message);
-}
-
-
-sub croak
-{
- my ($package, $file, $line) = caller;
- my $message = "@_ at $file line $line\n";
- Log (undef, $message);
- freeze() if @jobstep_todo;
- collate_output() if @jobstep_todo;
- cleanup();
- save_meta() if $metastream;
- die;
-}
-
-
-sub cleanup
-{
- return if !$job_has_uuid;
- $Job->reload;
- $Job->{'running'} = 0;
- $Job->{'success'} = 0;
- $Job->{'finished_at'} = gmtime;
- $Job->save;
-}
-
-
-sub save_meta
-{
- my $justcheckpoint = shift; # false if this will be the last meta saved
- my $m = $metastream;
- $m = $m->copy if $justcheckpoint;
- $m->write_finish;
- my $loglocator = $m->as_key;
- undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
- Log (undef, "meta key is $loglocator");
- $Job->{'log'} = $loglocator;
- $Job->save if $job_has_uuid;
-}
-
-
-sub freeze_if_want_freeze
-{
- if ($main::please_freeze)
- {
- release_allocation();
- if (@_)
- {
- # kill some srun procs before freeze+stop
- map { $proc{$_} = {} } @_;
- while (%proc)
- {
- killem (keys %proc);
- select (undef, undef, undef, 0.1);
- my $died;
- while (($died = waitpid (-1, WNOHANG)) > 0)
- {
- delete $proc{$died};
- }
- }
- }
- freeze();
- collate_output();
- cleanup();
- save_meta();
- exit 0;
- }
-}
-
-
-sub freeze
-{
- Log (undef, "Freeze not implemented");
- return;
-}
-
-
-sub thaw
-{
- croak ("Thaw not implemented");
-
- my $whc;
- my $key = shift;
- Log (undef, "thaw from $key");
-
- @jobstep = ();
- @jobstep_done = ();
- @jobstep_todo = ();
- @jobstep_tomerge = ();
- $jobstep_tomerge_level = 0;
- my $frozenjob = {};
-
- my $stream = new Warehouse::Stream ( whc => $whc,
- hash => [split (",", $key)] );
- $stream->rewind;
- while (my $dataref = $stream->read_until (undef, "\n\n"))
- {
- if ($$dataref =~ /^job /)
- {
- foreach (split ("\n", $$dataref))
- {
- my ($k, $v) = split ("=", $_, 2);
- $frozenjob->{$k} = freezeunquote ($v);
- }
- next;
- }
-
- if ($$dataref =~ /^merge (\d+) (.*)/)
- {
- $jobstep_tomerge_level = $1;
- @jobstep_tomerge
- = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
- next;
- }
-
- my $Jobstep = { };
- foreach (split ("\n", $$dataref))
- {
- my ($k, $v) = split ("=", $_, 2);
- $Jobstep->{$k} = freezeunquote ($v) if $k;
- }
- $Jobstep->{attempts} = 0;
- push @jobstep, $Jobstep;
-
- if ($Jobstep->{exitcode} eq "0")
- {
- push @jobstep_done, $#jobstep;
- }
- else
- {
- push @jobstep_todo, $#jobstep;
- }
- }
-
- foreach (qw (script script_version script_parameters))
- {
- $Job->{$_} = $frozenjob->{$_};
- }
- $Job->save if $job_has_uuid;
-}
-
-
-sub freezequote
-{
- my $s = shift;
- $s =~ s/\\/\\\\/g;
- $s =~ s/\n/\\n/g;
- return $s;
-}
-
-
-sub freezeunquote
-{
- my $s = shift;
- $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
- return $s;
-}
-
-
-sub srun
-{
- my $srunargs = shift;
- my $execargs = shift;
- my $opts = shift || {};
- my $stdin = shift;
- my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
- print STDERR (join (" ",
- map { / / ? "'$_'" : $_ }
- (@$args)),
- "\n")
- if $ENV{CRUNCH_DEBUG};
-
- if (defined $stdin) {
- my $child = open STDIN, "-|";
- defined $child or die "no fork: $!";
- if ($child == 0) {
- print $stdin or die $!;
- close STDOUT or die $!;
- exit 0;
- }
- }
-
- return system (@$args) if $opts->{fork};
-
- exec @$args;
- warn "ENV size is ".length(join(" ",%ENV));
- die "exec failed: $!: @$args";
-}
-
-
-sub ban_node_by_slot {
- # Don't start any new jobsteps on this node for 60 seconds
- my $slotid = shift;
- $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
- $slot[$slotid]->{node}->{hold_count}++;
- Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
-}
-
-__DATA__
-#!/usr/bin/perl
-
-# checkout-and-build
-
-use Fcntl ':flock';
-
-my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
-my $repo = $ENV{"CRUNCH_SRC_URL"};
-
-open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
-flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit) {
- exit 0;
-}
-
-unlink "$destdir.commit";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
-
-mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX <DATA>;
-if(!close(TARX)) {
- die "'tar -C $destdir -xf -' exited $?: $!";
-}
-
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
-mkdir $install_dir;
-if (-e "$destdir/crunch_scripts/install") {
- shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
-} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
- # Old version
- shell_or_die ("./tests/autotests.sh", $install_dir);
-} elsif (-e "./install.sh") {
- shell_or_die ("./install.sh", $install_dir);
-}
-
-if ($commit) {
- unlink "$destdir.commit.new";
- symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
- rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
-}
-
-close L;
-
-exit 0;
-
-sub shell_or_die
-{
- if ($ENV{"DEBUG"}) {
- print STDERR "@_\n";
- }
- system (@_) == 0
- or die "@_ failed: $! exit 0x".sprintf("%x",$?);
-}
-
-__DATA__
--- /dev/null
+../../sdk/cli/bin/arv-crunch-job
\ No newline at end of file