2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
151 if ($jobspec =~ /^[-a-z\d]+$/)
153 # $jobspec is an Arvados UUID, not a JSON job specification
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'state'} ne 'Queued') {
165 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
168 if ($Job->{'success'} ne undef) {
169 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
172 if ($Job->{'running'}) {
173 Log(undef, "Job 'running' flag is already set");
176 if ($Job->{'started_at'}) {
177 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
184 $Job = JSON::decode_json($jobspec);
188 map { croak ("No $_ specified") unless $Job->{$_} }
189 qw(script script_version script_parameters);
192 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193 $Job->{'started_at'} = gmtime;
195 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
197 $job_id = $Job->{'uuid'};
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
207 Log (undef, "check slurm allocation");
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
214 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215 push @sinfo, "$localcpus localhost";
217 if (exists $ENV{SLURM_NODELIST})
219 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
223 my ($ncpus, $slurm_nodelist) = split;
224 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
227 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
230 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
233 foreach (split (",", $ranges))
246 push @nodelist, map {
248 $n =~ s/\[[-,\d]+\]/$_/;
255 push @nodelist, $nodelist;
258 foreach my $nodename (@nodelist)
260 Log (undef, "node $nodename - $ncpus slots");
261 my $node = { name => $nodename,
265 foreach my $cpu (1..$ncpus)
267 push @slot, { node => $node,
271 push @node, @nodelist;
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
290 $Job->update_attributes('state' => 'Running',
291 'tasks_summary' => { 'failed' => 0,
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
329 if (defined $Job->{thawedfromkey})
331 thaw ($Job->{thawedfromkey});
335 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336 'job_uuid' => $Job->{'uuid'},
341 push @jobstep, { 'level' => 0,
343 'arvados_task' => $first_task,
345 push @jobstep_todo, 0;
351 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
358 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if (!defined $no_clear_tmp) {
364 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
365 system($clear_tmp_cmd) == 0
366 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
368 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
369 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
371 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
372 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
373 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
375 or croak ("setup.py in $src_path failed: exit ".($?>>8));
383 $build_script = <DATA>;
385 Log (undef, "Install revision ".$Job->{script_version});
386 my $nodelist = join(",", @node);
388 if (!defined $no_clear_tmp) {
389 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
391 my $cleanpid = fork();
394 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
395 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
400 last if $cleanpid == waitpid (-1, WNOHANG);
401 freeze_if_want_freeze ($cleanpid);
402 select (undef, undef, undef, 0.1);
404 Log (undef, "Clean-work-dir exited $?");
407 # Install requested code version
410 my @srunargs = ("srun",
411 "--nodelist=$nodelist",
412 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
414 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
415 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
419 my $treeish = $Job->{'script_version'};
421 # If we're running under crunch-dispatch, it will have pulled the
422 # appropriate source tree into its own repository, and given us that
423 # repo's path as $git_dir. If we're running a "local" job, and a
424 # script_version was specified, it's up to the user to provide the
425 # full path to a local repository in Job->{repository}.
427 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
428 # git-archive --remote where appropriate.
430 # TODO: Accept a locally-hosted Arvados repository by name or
431 # UUID. Use arvados.v1.repositories.list or .get to figure out the
432 # appropriate fetch-url.
433 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
435 $ENV{"CRUNCH_SRC_URL"} = $repo;
437 if (-d "$repo/.git") {
438 # We were given a working directory, but we are only interested in
440 $repo = "$repo/.git";
443 # If this looks like a subversion r#, look for it in git-svn commit messages
445 if ($treeish =~ m{^\d{1,4}$}) {
446 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
448 Log(undef, "git Subversion search exited $?");
449 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
451 Log(undef, "Using commit $commit for Subversion revision $treeish");
455 # If that didn't work, try asking git to look it up as a tree-ish.
457 if (!defined $commit) {
458 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460 Log(undef, "git rev-list exited $? with result '$found'");
461 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
463 Log(undef, "Using commit $commit for tree-ish $treeish");
464 if ($commit ne $treeish) {
465 # Make sure we record the real commit id in the database,
466 # frozentokey, logs, etc. -- instead of an abbreviation or a
467 # branch name which can become ambiguous or point to a
468 # different commit in the future.
469 $Job->{'script_version'} = $commit;
471 $Job->update_attributes('script_version' => $commit) or
472 croak("Error while updating job");
477 if (defined $commit) {
478 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
479 @execargs = ("sh", "-c",
480 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
481 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
482 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
485 croak ("could not figure out commit id for $treeish");
488 # Note: this section is almost certainly unnecessary if we're
489 # running tasks in docker containers.
490 my $installpid = fork();
491 if ($installpid == 0)
493 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
498 last if $installpid == waitpid (-1, WNOHANG);
499 freeze_if_want_freeze ($installpid);
500 select (undef, undef, undef, 0.1);
502 Log (undef, "Install exited $?");
507 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518 croak("No Docker image hash found from locator $docker_locator");
520 $docker_stream =~ s/^\.//;
521 my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 my $docker_pid = fork();
527 if ($docker_pid == 0)
529 srun (["srun", "--nodelist=" . join(',', @node)],
530 ["/bin/sh", "-ec", $docker_install_script]);
535 last if $docker_pid == waitpid (-1, WNOHANG);
536 freeze_if_want_freeze ($docker_pid);
537 select (undef, undef, undef, 0.1);
541 croak("Installing Docker image from $docker_locator returned exit code $?");
545 foreach (qw (script script_version script_parameters runtime_constraints))
549 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
551 foreach (split (/\n/, $Job->{knobs}))
553 Log (undef, "knob " . $_);
558 $main::success = undef;
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569 or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
576 my @freeslot = (0..$#slot);
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
582 update_progress_stats();
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
589 my $id = $jobstep_todo[$todo_ptr];
590 my $Jobstep = $jobstep[$id];
591 if ($Jobstep->{level} != $level)
596 pipe $reader{$id}, "writer" or croak ($!);
597 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
600 my $childslot = $freeslot[0];
601 my $childnode = $slot[$childslot]->{node};
602 my $childslotname = join (".",
603 $slot[$childslot]->{node}->{name},
604 $slot[$childslot]->{cpu});
605 my $childpid = fork();
608 $SIG{'INT'} = 'DEFAULT';
609 $SIG{'QUIT'} = 'DEFAULT';
610 $SIG{'TERM'} = 'DEFAULT';
612 foreach (values (%reader))
616 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617 open(STDOUT,">&writer");
618 open(STDERR,">&writer");
623 delete $ENV{"GNUPGHOME"};
624 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625 $ENV{"TASK_QSEQUENCE"} = $id;
626 $ENV{"TASK_SEQUENCE"} = $level;
627 $ENV{"JOB_SCRIPT"} = $Job->{script};
628 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629 $param =~ tr/a-z/A-Z/;
630 $ENV{"JOB_PARAMETER_$param"} = $value;
632 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635 $ENV{"HOME"} = $ENV{"TASK_WORK"};
636 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
645 "--nodelist=".$childnode->{name},
646 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647 "--job-name=$job_id.$id.$$",
649 my $build_script_to_send = "";
651 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653 ."&& cd $ENV{CRUNCH_TMP} ";
656 $build_script_to_send = $build_script;
660 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
663 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665 # Dynamically configure the container to use the host system as its
666 # DNS server. Get the host's global addresses from the ip command,
667 # and turn them into docker --dns options using gawk.
669 q{$(ip -o address show scope global |
670 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673 $command .= "--env=\QHOME=/home/crunch\E ";
674 while (my ($env_key, $env_val) = each %ENV)
676 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677 if ($env_key eq "TASK_WORK") {
678 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
680 elsif ($env_key eq "TASK_KEEPMOUNT") {
681 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
684 $command .= "--env=\Q$env_key=$env_val\E ";
688 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
689 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
690 $command .= "\Q$docker_hash\E ";
691 $command .= "stdbuf --output=0 --error=0 ";
692 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
695 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
696 $command .= "stdbuf --output=0 --error=0 ";
697 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
700 my @execargs = ('bash', '-c', $command);
701 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
702 # exec() failed, we assume nothing happened.
703 Log(undef, "srun() failed on build script");
707 if (!defined $childpid)
714 $proc{$childpid} = { jobstep => $id,
717 jobstepname => "$job_id.$id.$childpid",
719 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
720 $slot[$childslot]->{pid} = $childpid;
722 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
723 Log ($id, "child $childpid started on $childslotname");
724 $Jobstep->{starttime} = time;
725 $Jobstep->{node} = $childnode->{name};
726 $Jobstep->{slotindex} = $childslot;
727 delete $Jobstep->{stderr};
728 delete $Jobstep->{finishtime};
730 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
731 $Jobstep->{'arvados_task'}->save;
733 splice @jobstep_todo, $todo_ptr, 1;
736 $progress_is_dirty = 1;
740 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
742 last THISROUND if $main::please_freeze;
743 if ($main::please_info)
745 $main::please_info = 0;
749 update_progress_stats();
756 check_refresh_wanted();
758 update_progress_stats();
759 select (undef, undef, undef, 0.1);
761 elsif (time - $progress_stats_updated >= 30)
763 update_progress_stats();
765 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
766 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
768 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
769 .($thisround_failed+$thisround_succeeded)
770 .") -- giving up on this round";
771 Log (undef, $message);
775 # move slots from freeslot to holdslot (or back to freeslot) if necessary
776 for (my $i=$#freeslot; $i>=0; $i--) {
777 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
778 push @holdslot, (splice @freeslot, $i, 1);
781 for (my $i=$#holdslot; $i>=0; $i--) {
782 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
783 push @freeslot, (splice @holdslot, $i, 1);
787 # give up if no nodes are succeeding
788 if (!grep { $_->{node}->{losing_streak} == 0 &&
789 $_->{node}->{hold_count} < 4 } @slot) {
790 my $message = "Every node has failed -- giving up on this round";
791 Log (undef, $message);
798 push @freeslot, splice @holdslot;
799 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
802 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
805 if ($main::please_continue) {
806 $main::please_continue = 0;
809 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
813 check_refresh_wanted();
815 update_progress_stats();
816 select (undef, undef, undef, 0.1);
817 killem (keys %proc) if $main::please_freeze;
821 update_progress_stats();
822 freeze_if_want_freeze();
825 if (!defined $main::success)
828 $thisround_succeeded == 0 &&
829 ($thisround_failed == 0 || $thisround_failed > 4))
831 my $message = "stop because $thisround_failed tasks failed and none succeeded";
832 Log (undef, $message);
841 goto ONELEVEL if !defined $main::success;
844 release_allocation();
846 my $collated_output = &collate_output();
848 if (!$collated_output) {
849 Log(undef, "output undef");
853 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854 or die "failed to get collated manifest: $!";
855 my $orig_manifest_text = '';
856 while (my $manifest_line = <$orig_manifest>) {
857 $orig_manifest_text .= $manifest_line;
859 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
860 'manifest_text' => $orig_manifest_text,
862 Log(undef, "output uuid " . $output->{uuid});
863 Log(undef, "output hash " . $output->{portable_data_hash});
864 $Job->update_attributes('output' => $output->{portable_data_hash});
867 Log (undef, "Failed to register output manifest: $@");
871 Log (undef, "finish");
876 if ($collated_output && $main::success) {
877 $final_state = 'Complete';
879 $final_state = 'Failed';
881 $Job->update_attributes('state' => $final_state);
883 exit (($final_state eq 'Complete') ? 0 : 1);
887 sub update_progress_stats
889 $progress_stats_updated = time;
890 return if !$progress_is_dirty;
891 my ($todo, $done, $running) = (scalar @jobstep_todo,
892 scalar @jobstep_done,
893 scalar @slot - scalar @freeslot - scalar @holdslot);
894 $Job->{'tasks_summary'} ||= {};
895 $Job->{'tasks_summary'}->{'todo'} = $todo;
896 $Job->{'tasks_summary'}->{'done'} = $done;
897 $Job->{'tasks_summary'}->{'running'} = $running;
898 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
899 Log (undef, "status: $done done, $running running, $todo todo");
900 $progress_is_dirty = 0;
907 my $pid = waitpid (-1, WNOHANG);
908 return 0 if $pid <= 0;
910 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
912 . $slot[$proc{$pid}->{slot}]->{cpu});
913 my $jobstepid = $proc{$pid}->{jobstep};
914 my $elapsed = time - $proc{$pid}->{time};
915 my $Jobstep = $jobstep[$jobstepid];
917 my $childstatus = $?;
918 my $exitvalue = $childstatus >> 8;
919 my $exitinfo = sprintf("exit %d signal %d%s",
922 ($childstatus & 128 ? ' core dump' : ''));
923 $Jobstep->{'arvados_task'}->reload;
924 my $task_success = $Jobstep->{'arvados_task'}->{success};
926 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
928 if (!defined $task_success) {
929 # task did not indicate one way or the other --> fail
930 $Jobstep->{'arvados_task'}->{success} = 0;
931 $Jobstep->{'arvados_task'}->save;
938 $temporary_fail ||= $Jobstep->{node_fail};
939 $temporary_fail ||= ($exitvalue == 111);
942 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
944 # Check for signs of a failed or misconfigured node
945 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
946 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
947 # Don't count this against jobstep failure thresholds if this
948 # node is already suspected faulty and srun exited quickly
949 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
951 Log ($jobstepid, "blaming failure on suspect node " .
952 $slot[$proc{$pid}->{slot}]->{node}->{name});
953 $temporary_fail ||= 1;
955 ban_node_by_slot($proc{$pid}->{slot});
958 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
959 ++$Jobstep->{'failures'},
960 $temporary_fail ? 'temporary ' : 'permanent',
963 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
964 # Give up on this task, and the whole job
966 $main::please_freeze = 1;
968 # Put this task back on the todo queue
969 push @jobstep_todo, $jobstepid;
970 $Job->{'tasks_summary'}->{'failed'}++;
974 ++$thisround_succeeded;
975 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
976 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
977 push @jobstep_done, $jobstepid;
978 Log ($jobstepid, "success in $elapsed seconds");
980 $Jobstep->{exitcode} = $childstatus;
981 $Jobstep->{finishtime} = time;
982 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
983 $Jobstep->{'arvados_task'}->save;
984 process_stderr ($jobstepid, $task_success);
985 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
987 close $reader{$jobstepid};
988 delete $reader{$jobstepid};
989 delete $slot[$proc{$pid}->{slot}]->{pid};
990 push @freeslot, $proc{$pid}->{slot};
995 my $newtask_list = [];
998 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1000 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1002 'order' => 'qsequence',
1003 'offset' => scalar(@$newtask_list),
1005 push(@$newtask_list, @{$newtask_results->{items}});
1006 } while (@{$newtask_results->{items}});
1007 foreach my $arvados_task (@$newtask_list) {
1009 'level' => $arvados_task->{'sequence'},
1011 'arvados_task' => $arvados_task
1013 push @jobstep, $jobstep;
1014 push @jobstep_todo, $#jobstep;
1018 $progress_is_dirty = 1;
1022 sub check_refresh_wanted
1024 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1025 if (@stat && $stat[9] > $latest_refresh) {
1026 $latest_refresh = scalar time;
1027 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1028 for my $attr ('cancelled_at',
1029 'cancelled_by_user_uuid',
1030 'cancelled_by_client_uuid',
1032 $Job->{$attr} = $Job2->{$attr};
1034 if ($Job->{'state'} ne "Running") {
1035 if ($Job->{'state'} eq "Cancelled") {
1036 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1038 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1041 $main::please_freeze = 1;
1048 # return if the kill list was checked <4 seconds ago
1049 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1053 $squeue_kill_checked = time;
1055 # use killem() on procs whose killtime is reached
1058 if (exists $proc{$_}->{killtime}
1059 && $proc{$_}->{killtime} <= time)
1065 # return if the squeue was checked <60 seconds ago
1066 if (defined $squeue_checked && $squeue_checked > time - 60)
1070 $squeue_checked = time;
1074 # here is an opportunity to check for mysterious problems with local procs
1078 # get a list of steps still running
1079 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1081 if ($squeue[-1] ne "ok")
1087 # which of my jobsteps are running, according to squeue?
1091 if (/^(\d+)\.(\d+) (\S+)/)
1093 if ($1 eq $ENV{SLURM_JOBID})
1100 # which of my active child procs (>60s old) were not mentioned by squeue?
1101 foreach (keys %proc)
1103 if ($proc{$_}->{time} < time - 60
1104 && !exists $ok{$proc{$_}->{jobstepname}}
1105 && !exists $proc{$_}->{killtime})
1107 # kill this proc if it hasn't exited in 30 seconds
1108 $proc{$_}->{killtime} = time + 30;
1114 sub release_allocation
1118 Log (undef, "release job allocation");
1119 system "scancel $ENV{SLURM_JOBID}";
1127 foreach my $job (keys %reader)
1130 while (0 < sysread ($reader{$job}, $buf, 8192))
1132 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1133 $jobstep[$job]->{stderr} .= $buf;
1134 preprocess_stderr ($job);
1135 if (length ($jobstep[$job]->{stderr}) > 16384)
1137 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1146 sub preprocess_stderr
1150 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1152 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1153 Log ($job, "stderr $line");
1154 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1156 $main::please_freeze = 1;
1158 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1159 $jobstep[$job]->{node_fail} = 1;
1160 ban_node_by_slot($jobstep[$job]->{slotindex});
1169 my $task_success = shift;
1170 preprocess_stderr ($job);
1173 Log ($job, "stderr $_");
1174 } split ("\n", $jobstep[$job]->{stderr});
1180 my ($keep, $child_out, $output_block);
1182 my $cmd = "arv-get \Q$hash\E";
1183 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1187 my $bytes = sysread($keep, $buf, 1024 * 1024);
1188 if (!defined $bytes) {
1189 die "reading from arv-get: $!";
1190 } elsif ($bytes == 0) {
1191 # sysread returns 0 at the end of the pipe.
1194 # some bytes were read into buf.
1195 $output_block .= $buf;
1199 return $output_block;
1204 Log (undef, "collate");
1206 my ($child_out, $child_in);
1207 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1208 '--retries', put_retry_count());
1212 next if (!exists $_->{'arvados_task'}->{'output'} ||
1213 !$_->{'arvados_task'}->{'success'});
1214 my $output = $_->{'arvados_task'}->{output};
1215 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1217 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1218 print $child_in $output;
1220 elsif (@jobstep == 1)
1222 $joboutput = $output;
1225 elsif (defined (my $outblock = fetch_block ($output)))
1227 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1228 print $child_in $outblock;
1232 Log (undef, "XXX fetch_block($output) failed XXX");
1238 if (!defined $joboutput) {
1239 my $s = IO::Select->new($child_out);
1240 if ($s->can_read(120)) {
1241 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1244 Log (undef, "timed out reading from 'arv-put'");
1257 my $sig = 2; # SIGINT first
1258 if (exists $proc{$_}->{"sent_$sig"} &&
1259 time - $proc{$_}->{"sent_$sig"} > 4)
1261 $sig = 15; # SIGTERM if SIGINT doesn't work
1263 if (exists $proc{$_}->{"sent_$sig"} &&
1264 time - $proc{$_}->{"sent_$sig"} > 4)
1266 $sig = 9; # SIGKILL if SIGTERM doesn't work
1268 if (!exists $proc{$_}->{"sent_$sig"})
1270 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1272 select (undef, undef, undef, 0.1);
1275 kill $sig, $_; # srun wants two SIGINT to really interrupt
1277 $proc{$_}->{"sent_$sig"} = time;
1278 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1288 vec($bits,fileno($_),1) = 1;
1294 sub Log # ($jobstep_id, $logmessage)
1296 if ($_[1] =~ /\n/) {
1297 for my $line (split (/\n/, $_[1])) {
1302 my $fh = select STDERR; $|=1; select $fh;
1303 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1304 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1307 if ($local_logfile || -t STDERR) {
1308 my @gmtime = gmtime;
1309 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1310 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1312 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1314 if ($local_logfile) {
1315 print $local_logfile $datetime . " " . $message;
1322 my ($package, $file, $line) = caller;
1323 my $message = "@_ at $file line $line\n";
1324 Log (undef, $message);
1325 freeze() if @jobstep_todo;
1326 collate_output() if @jobstep_todo;
1328 save_meta() if $local_logfile;
1335 if ($Job->{'state'} eq 'Cancelled') {
1336 $Job->update_attributes('finished_at' => scalar gmtime);
1338 $Job->update_attributes('state' => 'Failed');
1345 my $justcheckpoint = shift; # false if this will be the last meta saved
1346 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1348 $local_logfile->flush;
1349 my $retry_count = put_retry_count();
1350 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1351 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1352 my $loglocator = `$cmd`;
1353 die "system $cmd failed: $?" if $?;
1356 $local_logfile = undef; # the temp file is automatically deleted
1357 Log (undef, "log manifest is $loglocator");
1358 $Job->{'log'} = $loglocator;
1359 $Job->update_attributes('log', $loglocator);
1363 sub freeze_if_want_freeze
1365 if ($main::please_freeze)
1367 release_allocation();
1370 # kill some srun procs before freeze+stop
1371 map { $proc{$_} = {} } @_;
1374 killem (keys %proc);
1375 select (undef, undef, undef, 0.1);
1377 while (($died = waitpid (-1, WNOHANG)) > 0)
1379 delete $proc{$died};
1394 Log (undef, "Freeze not implemented");
1401 croak ("Thaw not implemented");
1417 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1424 my $srunargs = shift;
1425 my $execargs = shift;
1426 my $opts = shift || {};
1428 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1429 print STDERR (join (" ",
1430 map { / / ? "'$_'" : $_ }
1433 if $ENV{CRUNCH_DEBUG};
1435 if (defined $stdin) {
1436 my $child = open STDIN, "-|";
1437 defined $child or die "no fork: $!";
1439 print $stdin or die $!;
1440 close STDOUT or die $!;
1445 return system (@$args) if $opts->{fork};
1448 warn "ENV size is ".length(join(" ",%ENV));
1449 die "exec failed: $!: @$args";
1453 sub ban_node_by_slot {
1454 # Don't start any new jobsteps on this node for 60 seconds
1456 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1457 $slot[$slotid]->{node}->{hold_count}++;
1458 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1463 my ($lockfile, $error_message) = @_;
1464 open L, ">", $lockfile or croak("$lockfile: $!");
1465 if (!flock L, LOCK_EX|LOCK_NB) {
1466 croak("Can't lock $lockfile: $error_message\n");
1470 sub find_docker_image {
1471 # Given a Keep locator, check to see if it contains a Docker image.
1472 # If so, return its stream name and Docker hash.
1473 # If not, return undef for both values.
1474 my $locator = shift;
1475 my ($streamname, $filename);
1476 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1477 foreach my $line (split(/\n/, $image->{manifest_text})) {
1478 my @tokens = split(/\s+/, $line);
1480 $streamname = shift(@tokens);
1481 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1482 if (defined($filename)) {
1483 return (undef, undef); # More than one file in the Collection.
1485 $filename = (split(/:/, $filedata, 3))[2];
1490 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1491 return ($streamname, $1);
1493 return (undef, undef);
1497 sub put_retry_count {
1498 # Calculate a --retries argument for arv-put that will have it try
1499 # approximately as long as this Job has been running.
1500 my $stoptime = shift || time;
1501 my $starttime = $jobstep[0]->{starttime};
1502 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1504 while ($timediff >= 2) {
1508 return ($retries > 3) ? $retries : 3;
1514 # checkout-and-build
1517 use File::Path qw( make_path );
1519 my $destdir = $ENV{"CRUNCH_SRC"};
1520 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1521 my $repo = $ENV{"CRUNCH_SRC_URL"};
1522 my $task_work = $ENV{"TASK_WORK"};
1524 for my $dir ($destdir, $task_work) {
1527 -e $dir or die "Failed to create temporary directory ($dir): $!";
1531 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1533 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1536 die "Cannot exec `@ARGV`: $!";
1542 unlink "$destdir.commit";
1543 open STDOUT, ">", "$destdir.log";
1544 open STDERR, ">&STDOUT";
1547 my @git_archive_data = <DATA>;
1548 if (@git_archive_data) {
1549 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1550 print TARX @git_archive_data;
1552 die "'tar -C $destdir -xf -' exited $?: $!";
1557 chomp ($pwd = `pwd`);
1558 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1561 for my $src_path ("$destdir/arvados/sdk/python") {
1563 shell_or_die ("virtualenv", $install_dir);
1564 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1568 if (-e "$destdir/crunch_scripts/install") {
1569 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1570 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1572 shell_or_die ("./tests/autotests.sh", $install_dir);
1573 } elsif (-e "./install.sh") {
1574 shell_or_die ("./install.sh", $install_dir);
1578 unlink "$destdir.commit.new";
1579 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1580 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1587 die "Cannot exec `@ARGV`: $!";
1594 if ($ENV{"DEBUG"}) {
1595 print STDERR "@_\n";
1598 or die "@_ failed: $! exit 0x".sprintf("%x",$?);