2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
134 $main::ENV{CRUNCH_DEBUG} = 1;
138 $main::ENV{CRUNCH_DEBUG} = 0;
143 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # If some other crunch-job process has grabbed this job (or we see
156 # other evidence that the job is already underway) we exit
157 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158 # mark the job as failed.
159 if ($Job->{'is_locked_by_uuid'}) {
160 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
163 if ($Job->{'state'} ne 'Queued') {
164 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
167 if ($Job->{'success'} ne undef) {
168 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
171 if ($Job->{'running'}) {
172 Log(undef, "Job 'running' flag is already set");
175 if ($Job->{'started_at'}) {
176 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
183 $Job = JSON::decode_json($jobspec);
187 map { croak ("No $_ specified") unless $Job->{$_} }
188 qw(script script_version script_parameters);
191 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
192 $Job->{'started_at'} = gmtime;
194 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
198 $job_id = $Job->{'uuid'};
200 my $keep_logfile = $job_id . '.log.txt';
201 start_output_log($keep_logfile);
203 $Job->{'runtime_constraints'} ||= {};
204 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
205 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
208 Log (undef, "check slurm allocation");
211 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
215 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
216 push @sinfo, "$localcpus localhost";
218 if (exists $ENV{SLURM_NODELIST})
220 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
224 my ($ncpus, $slurm_nodelist) = split;
225 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
228 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
231 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
234 foreach (split (",", $ranges))
247 push @nodelist, map {
249 $n =~ s/\[[-,\d]+\]/$_/;
256 push @nodelist, $nodelist;
259 foreach my $nodename (@nodelist)
261 Log (undef, "node $nodename - $ncpus slots");
262 my $node = { name => $nodename,
266 foreach my $cpu (1..$ncpus)
268 push @slot, { node => $node,
272 push @node, @nodelist;
277 # Ensure that we get one jobstep running on each allocated node before
278 # we start overloading nodes with concurrent steps
280 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
287 # Claim this job, and make sure nobody else does
288 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
289 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
290 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
293 $Job->update_attributes('state' => 'Running',
294 'tasks_summary' => { 'failed' => 0,
301 Log (undef, "start");
302 $SIG{'INT'} = sub { $main::please_freeze = 1; };
303 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
304 $SIG{'TERM'} = \&croak;
305 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
306 $SIG{'ALRM'} = sub { $main::please_info = 1; };
307 $SIG{'CONT'} = sub { $main::please_continue = 1; };
308 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
310 $main::please_freeze = 0;
311 $main::please_info = 0;
312 $main::please_continue = 0;
313 $main::please_refresh = 0;
314 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
316 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
317 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
318 $ENV{"JOB_UUID"} = $job_id;
322 my @jobstep_todo = ();
323 my @jobstep_done = ();
324 my @jobstep_tomerge = ();
325 my $jobstep_tomerge_level = 0;
327 my $squeue_kill_checked;
328 my $output_in_keep = 0;
329 my $latest_refresh = scalar time;
333 if (defined $Job->{thawedfromkey})
335 thaw ($Job->{thawedfromkey});
339 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
340 'job_uuid' => $Job->{'uuid'},
345 push @jobstep, { 'level' => 0,
347 'arvados_task' => $first_task,
349 push @jobstep_todo, 0;
355 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
362 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
364 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
367 if (!defined $no_clear_tmp) {
368 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
369 system($clear_tmp_cmd) == 0
370 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
372 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
373 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
375 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
376 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
377 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
379 or croak ("setup.py in $src_path failed: exit ".($?>>8));
387 $build_script = <DATA>;
389 Log (undef, "Install revision ".$Job->{script_version});
390 my $nodelist = join(",", @node);
392 if (!defined $no_clear_tmp) {
393 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
395 my $cleanpid = fork();
398 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
399 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
404 last if $cleanpid == waitpid (-1, WNOHANG);
405 freeze_if_want_freeze ($cleanpid);
406 select (undef, undef, undef, 0.1);
408 Log (undef, "Clean-work-dir exited $?");
411 # Install requested code version
414 my @srunargs = ("srun",
415 "--nodelist=$nodelist",
416 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
418 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
419 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
423 my $treeish = $Job->{'script_version'};
425 # If we're running under crunch-dispatch, it will have pulled the
426 # appropriate source tree into its own repository, and given us that
427 # repo's path as $git_dir. If we're running a "local" job, and a
428 # script_version was specified, it's up to the user to provide the
429 # full path to a local repository in Job->{repository}.
431 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
432 # git-archive --remote where appropriate.
434 # TODO: Accept a locally-hosted Arvados repository by name or
435 # UUID. Use arvados.v1.repositories.list or .get to figure out the
436 # appropriate fetch-url.
437 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
439 $ENV{"CRUNCH_SRC_URL"} = $repo;
441 if (-d "$repo/.git") {
442 # We were given a working directory, but we are only interested in
444 $repo = "$repo/.git";
447 # If this looks like a subversion r#, look for it in git-svn commit messages
449 if ($treeish =~ m{^\d{1,4}$}) {
450 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
452 Log(undef, "git Subversion search exited $?");
453 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
455 Log(undef, "Using commit $commit for Subversion revision $treeish");
459 # If that didn't work, try asking git to look it up as a tree-ish.
461 if (!defined $commit) {
462 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
464 Log(undef, "git rev-list exited $? with result '$found'");
465 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
467 Log(undef, "Using commit $commit for tree-ish $treeish");
468 if ($commit ne $treeish) {
469 # Make sure we record the real commit id in the database,
470 # frozentokey, logs, etc. -- instead of an abbreviation or a
471 # branch name which can become ambiguous or point to a
472 # different commit in the future.
473 $Job->{'script_version'} = $commit;
475 $Job->update_attributes('script_version' => $commit) or
476 croak("Error while updating job");
481 if (defined $commit) {
482 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
483 @execargs = ("sh", "-c",
484 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
485 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
486 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
489 croak ("could not figure out commit id for $treeish");
492 # Note: this section is almost certainly unnecessary if we're
493 # running tasks in docker containers.
494 my $installpid = fork();
495 if ($installpid == 0)
497 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
502 last if $installpid == waitpid (-1, WNOHANG);
503 freeze_if_want_freeze ($installpid);
504 select (undef, undef, undef, 0.1);
506 Log (undef, "Install exited $?");
511 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
512 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
515 # If this job requires a Docker image, install that.
516 my $docker_bin = "/usr/bin/docker.io";
517 my ($docker_locator, $docker_stream, $docker_hash);
518 if ($docker_locator = $Job->{docker_image_locator}) {
519 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
522 croak("No Docker image hash found from locator $docker_locator");
524 $docker_stream =~ s/^\.//;
525 my $docker_install_script = qq{
526 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
527 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
530 my $docker_pid = fork();
531 if ($docker_pid == 0)
533 srun (["srun", "--nodelist=" . join(',', @node)],
534 ["/bin/sh", "-ec", $docker_install_script]);
539 last if $docker_pid == waitpid (-1, WNOHANG);
540 freeze_if_want_freeze ($docker_pid);
541 select (undef, undef, undef, 0.1);
545 croak("Installing Docker image from $docker_locator returned exit code $?");
549 foreach (qw (script script_version script_parameters runtime_constraints))
553 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
555 foreach (split (/\n/, $Job->{knobs}))
557 Log (undef, "knob " . $_);
562 $main::success = undef;
568 my $thisround_succeeded = 0;
569 my $thisround_failed = 0;
570 my $thisround_failed_multiple = 0;
572 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
573 or $a <=> $b } @jobstep_todo;
574 my $level = $jobstep[$jobstep_todo[0]]->{level};
575 Log (undef, "start level $level");
580 my @freeslot = (0..$#slot);
583 my $progress_is_dirty = 1;
584 my $progress_stats_updated = 0;
586 update_progress_stats();
591 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
593 my $id = $jobstep_todo[$todo_ptr];
594 my $Jobstep = $jobstep[$id];
595 if ($Jobstep->{level} != $level)
600 pipe $reader{$id}, "writer" or croak ($!);
601 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
602 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
604 my $childslot = $freeslot[0];
605 my $childnode = $slot[$childslot]->{node};
606 my $childslotname = join (".",
607 $slot[$childslot]->{node}->{name},
608 $slot[$childslot]->{cpu});
609 my $childpid = fork();
612 $SIG{'INT'} = 'DEFAULT';
613 $SIG{'QUIT'} = 'DEFAULT';
614 $SIG{'TERM'} = 'DEFAULT';
616 foreach (values (%reader))
620 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
621 open(STDOUT,">&writer");
622 open(STDERR,">&writer");
627 delete $ENV{"GNUPGHOME"};
628 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
629 $ENV{"TASK_QSEQUENCE"} = $id;
630 $ENV{"TASK_SEQUENCE"} = $level;
631 $ENV{"JOB_SCRIPT"} = $Job->{script};
632 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
633 $param =~ tr/a-z/A-Z/;
634 $ENV{"JOB_PARAMETER_$param"} = $value;
636 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
637 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
638 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
639 $ENV{"HOME"} = $ENV{"TASK_WORK"};
640 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
641 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
642 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
643 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
649 "--nodelist=".$childnode->{name},
650 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
651 "--job-name=$job_id.$id.$$",
653 my $build_script_to_send = "";
655 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
656 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
657 ."&& cd $ENV{CRUNCH_TMP} ";
660 $build_script_to_send = $build_script;
664 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
667 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
668 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
669 # Dynamically configure the container to use the host system as its
670 # DNS server. Get the host's global addresses from the ip command,
671 # and turn them into docker --dns options using gawk.
673 q{$(ip -o address show scope global |
674 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
675 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
676 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
677 $command .= "--env=\QHOME=/home/crunch\E ";
678 while (my ($env_key, $env_val) = each %ENV)
680 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
681 if ($env_key eq "TASK_WORK") {
682 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
684 elsif ($env_key eq "TASK_KEEPMOUNT") {
685 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
688 $command .= "--env=\Q$env_key=$env_val\E ";
692 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
693 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
694 $command .= "\Q$docker_hash\E ";
695 $command .= "stdbuf --output=0 --error=0 ";
696 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
699 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
700 $command .= "stdbuf --output=0 --error=0 ";
701 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
704 my @execargs = ('bash', '-c', $command);
705 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
706 # exec() failed, we assume nothing happened.
707 Log(undef, "srun() failed on build script");
711 if (!defined $childpid)
718 $proc{$childpid} = { jobstep => $id,
721 jobstepname => "$job_id.$id.$childpid",
723 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
724 $slot[$childslot]->{pid} = $childpid;
726 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
727 Log ($id, "child $childpid started on $childslotname");
728 $Jobstep->{starttime} = time;
729 $Jobstep->{node} = $childnode->{name};
730 $Jobstep->{slotindex} = $childslot;
731 delete $Jobstep->{stderr};
732 delete $Jobstep->{finishtime};
734 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
735 $Jobstep->{'arvados_task'}->save;
737 splice @jobstep_todo, $todo_ptr, 1;
740 $progress_is_dirty = 1;
744 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
746 last THISROUND if $main::please_freeze;
747 if ($main::please_info)
749 $main::please_info = 0;
753 update_progress_stats();
760 check_refresh_wanted();
762 update_progress_stats();
763 select (undef, undef, undef, 0.1);
765 elsif (time - $progress_stats_updated >= 30)
767 update_progress_stats();
769 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
770 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
772 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
773 .($thisround_failed+$thisround_succeeded)
774 .") -- giving up on this round";
775 Log (undef, $message);
779 # move slots from freeslot to holdslot (or back to freeslot) if necessary
780 for (my $i=$#freeslot; $i>=0; $i--) {
781 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
782 push @holdslot, (splice @freeslot, $i, 1);
785 for (my $i=$#holdslot; $i>=0; $i--) {
786 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
787 push @freeslot, (splice @holdslot, $i, 1);
791 # give up if no nodes are succeeding
792 if (!grep { $_->{node}->{losing_streak} == 0 &&
793 $_->{node}->{hold_count} < 4 } @slot) {
794 my $message = "Every node has failed -- giving up on this round";
795 Log (undef, $message);
802 push @freeslot, splice @holdslot;
803 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
806 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
809 if ($main::please_continue) {
810 $main::please_continue = 0;
813 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
817 check_refresh_wanted();
819 update_progress_stats();
820 select (undef, undef, undef, 0.1);
821 killem (keys %proc) if $main::please_freeze;
825 update_progress_stats();
826 freeze_if_want_freeze();
829 if (!defined $main::success)
832 $thisround_succeeded == 0 &&
833 ($thisround_failed == 0 || $thisround_failed > 4))
835 my $message = "stop because $thisround_failed tasks failed and none succeeded";
836 Log (undef, $message);
845 goto ONELEVEL if !defined $main::success;
848 release_allocation();
850 my $collated_output = &collate_output();
852 if (!$collated_output) {
853 Log(undef, "output undef");
857 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
858 or die "failed to get collated manifest: $!";
859 my $orig_manifest_text = '';
860 while (my $manifest_line = <$orig_manifest>) {
861 $orig_manifest_text .= $manifest_line;
863 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
864 'manifest_text' => $orig_manifest_text,
866 Log(undef, "output uuid " . $output->{uuid});
867 Log(undef, "output hash " . $output->{portable_data_hash});
868 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
871 Log (undef, "Failed to register output manifest: $@");
875 Log (undef, "finish");
880 if ($collated_output && $main::success) {
881 $Job->update_attributes('state' => 'Complete')
883 $Job->update_attributes('state' => 'Failed')
887 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
891 sub update_progress_stats
893 $progress_stats_updated = time;
894 return if !$progress_is_dirty;
895 my ($todo, $done, $running) = (scalar @jobstep_todo,
896 scalar @jobstep_done,
897 scalar @slot - scalar @freeslot - scalar @holdslot);
898 $Job->{'tasks_summary'} ||= {};
899 $Job->{'tasks_summary'}->{'todo'} = $todo;
900 $Job->{'tasks_summary'}->{'done'} = $done;
901 $Job->{'tasks_summary'}->{'running'} = $running;
903 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
905 Log (undef, "status: $done done, $running running, $todo todo");
906 $progress_is_dirty = 0;
913 my $pid = waitpid (-1, WNOHANG);
914 return 0 if $pid <= 0;
916 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
918 . $slot[$proc{$pid}->{slot}]->{cpu});
919 my $jobstepid = $proc{$pid}->{jobstep};
920 my $elapsed = time - $proc{$pid}->{time};
921 my $Jobstep = $jobstep[$jobstepid];
923 my $childstatus = $?;
924 my $exitvalue = $childstatus >> 8;
925 my $exitinfo = sprintf("exit %d signal %d%s",
928 ($childstatus & 128 ? ' core dump' : ''));
929 $Jobstep->{'arvados_task'}->reload;
930 my $task_success = $Jobstep->{'arvados_task'}->{success};
932 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
934 if (!defined $task_success) {
935 # task did not indicate one way or the other --> fail
936 $Jobstep->{'arvados_task'}->{success} = 0;
937 $Jobstep->{'arvados_task'}->save;
944 $temporary_fail ||= $Jobstep->{node_fail};
945 $temporary_fail ||= ($exitvalue == 111);
948 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
950 # Check for signs of a failed or misconfigured node
951 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
952 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
953 # Don't count this against jobstep failure thresholds if this
954 # node is already suspected faulty and srun exited quickly
955 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
957 Log ($jobstepid, "blaming failure on suspect node " .
958 $slot[$proc{$pid}->{slot}]->{node}->{name});
959 $temporary_fail ||= 1;
961 ban_node_by_slot($proc{$pid}->{slot});
964 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
965 ++$Jobstep->{'failures'},
966 $temporary_fail ? 'temporary ' : 'permanent',
969 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
970 # Give up on this task, and the whole job
972 $main::please_freeze = 1;
975 # Put this task back on the todo queue
976 push @jobstep_todo, $jobstepid;
978 $Job->{'tasks_summary'}->{'failed'}++;
982 ++$thisround_succeeded;
983 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
984 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
985 push @jobstep_done, $jobstepid;
986 Log ($jobstepid, "success in $elapsed seconds");
988 $Jobstep->{exitcode} = $childstatus;
989 $Jobstep->{finishtime} = time;
990 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
991 $Jobstep->{'arvados_task'}->save;
992 process_stderr ($jobstepid, $task_success);
993 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
995 close $reader{$jobstepid};
996 delete $reader{$jobstepid};
997 delete $slot[$proc{$pid}->{slot}]->{pid};
998 push @freeslot, $proc{$pid}->{slot};
1001 if ($task_success) {
1003 my $newtask_list = [];
1004 my $newtask_results;
1006 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1008 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1010 'order' => 'qsequence',
1011 'offset' => scalar(@$newtask_list),
1013 push(@$newtask_list, @{$newtask_results->{items}});
1014 } while (@{$newtask_results->{items}});
1015 foreach my $arvados_task (@$newtask_list) {
1017 'level' => $arvados_task->{'sequence'},
1019 'arvados_task' => $arvados_task
1021 push @jobstep, $jobstep;
1022 push @jobstep_todo, $#jobstep;
1026 $progress_is_dirty = 1;
1030 sub check_refresh_wanted
1032 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1033 if (@stat && $stat[9] > $latest_refresh) {
1034 $latest_refresh = scalar time;
1035 if ($job_has_uuid) {
1036 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1037 for my $attr ('cancelled_at',
1038 'cancelled_by_user_uuid',
1039 'cancelled_by_client_uuid',
1041 $Job->{$attr} = $Job2->{$attr};
1043 if ($Job->{'state'} ne "Running") {
1044 if ($Job->{'state'} eq "Cancelled") {
1045 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1047 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1050 $main::please_freeze = 1;
1058 # return if the kill list was checked <4 seconds ago
1059 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1063 $squeue_kill_checked = time;
1065 # use killem() on procs whose killtime is reached
1068 if (exists $proc{$_}->{killtime}
1069 && $proc{$_}->{killtime} <= time)
1075 # return if the squeue was checked <60 seconds ago
1076 if (defined $squeue_checked && $squeue_checked > time - 60)
1080 $squeue_checked = time;
1084 # here is an opportunity to check for mysterious problems with local procs
1088 # get a list of steps still running
1089 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1091 if ($squeue[-1] ne "ok")
1097 # which of my jobsteps are running, according to squeue?
1101 if (/^(\d+)\.(\d+) (\S+)/)
1103 if ($1 eq $ENV{SLURM_JOBID})
1110 # which of my active child procs (>60s old) were not mentioned by squeue?
1111 foreach (keys %proc)
1113 if ($proc{$_}->{time} < time - 60
1114 && !exists $ok{$proc{$_}->{jobstepname}}
1115 && !exists $proc{$_}->{killtime})
1117 # kill this proc if it hasn't exited in 30 seconds
1118 $proc{$_}->{killtime} = time + 30;
1124 sub release_allocation
1128 Log (undef, "release job allocation");
1129 system "scancel $ENV{SLURM_JOBID}";
1137 foreach my $job (keys %reader)
1140 while (0 < sysread ($reader{$job}, $buf, 8192))
1142 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1143 $jobstep[$job]->{stderr} .= $buf;
1144 preprocess_stderr ($job);
1145 if (length ($jobstep[$job]->{stderr}) > 16384)
1147 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1156 sub preprocess_stderr
1160 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1162 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1163 Log ($job, "stderr $line");
1164 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1166 $main::please_freeze = 1;
1168 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1169 $jobstep[$job]->{node_fail} = 1;
1170 ban_node_by_slot($jobstep[$job]->{slotindex});
1179 my $task_success = shift;
1180 preprocess_stderr ($job);
1183 Log ($job, "stderr $_");
1184 } split ("\n", $jobstep[$job]->{stderr});
1190 my ($keep, $child_out, $output_block);
1192 my $cmd = "arv-get \Q$hash\E";
1193 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1197 my $bytes = sysread($keep, $buf, 1024 * 1024);
1198 if (!defined $bytes) {
1199 die "reading from arv-get: $!";
1200 } elsif ($bytes == 0) {
1201 # sysread returns 0 at the end of the pipe.
1204 # some bytes were read into buf.
1205 $output_block .= $buf;
1209 return $output_block;
1214 Log (undef, "collate");
1216 my ($child_out, $child_in);
1217 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1218 '--retries', put_retry_count());
1222 next if (!exists $_->{'arvados_task'}->{'output'} ||
1223 !$_->{'arvados_task'}->{'success'});
1224 my $output = $_->{'arvados_task'}->{output};
1225 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1227 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1228 print $child_in $output;
1230 elsif (@jobstep == 1)
1232 $joboutput = $output;
1235 elsif (defined (my $outblock = fetch_block ($output)))
1237 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1238 print $child_in $outblock;
1242 Log (undef, "XXX fetch_block($output) failed XXX");
1248 if (!defined $joboutput) {
1249 my $s = IO::Select->new($child_out);
1250 if ($s->can_read(120)) {
1251 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1254 Log (undef, "timed out reading from 'arv-put'");
1267 my $sig = 2; # SIGINT first
1268 if (exists $proc{$_}->{"sent_$sig"} &&
1269 time - $proc{$_}->{"sent_$sig"} > 4)
1271 $sig = 15; # SIGTERM if SIGINT doesn't work
1273 if (exists $proc{$_}->{"sent_$sig"} &&
1274 time - $proc{$_}->{"sent_$sig"} > 4)
1276 $sig = 9; # SIGKILL if SIGTERM doesn't work
1278 if (!exists $proc{$_}->{"sent_$sig"})
1280 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1282 select (undef, undef, undef, 0.1);
1285 kill $sig, $_; # srun wants two SIGINT to really interrupt
1287 $proc{$_}->{"sent_$sig"} = time;
1288 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1298 vec($bits,fileno($_),1) = 1;
1304 # Output logging to arv-put.
1306 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1307 # $log_pipe_pid is the pid of the arv-put subprocess.
1309 # The only functions that should access these variables directly are:
1311 # start_output_log($logfilename)
1312 # Starts an arv-put pipe, reading data on stdin and writing it to
1313 # a $logfilename file in an output collection.
1315 # write_output_log($txt)
1316 # Writes $txt to the output log collection.
1318 # finish_output_log()
1319 # Closes the arv-put pipe and returns the output that it produces.
1321 # output_log_is_active()
1322 # Returns a true value if there is currently a live arv-put
1323 # process, false otherwise.
1325 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1327 sub start_output_log($)
1329 my $logfilename = shift;
1330 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1331 'arv-put', '--portable-data-hash',
1333 '--filename', $logfilename,
1337 sub write_output_log($)
1340 print $log_pipe_in $txt;
1343 sub finish_output_log()
1345 return unless $log_pipe_pid;
1347 close($log_pipe_in);
1350 my $s = IO::Select->new($log_pipe_out);
1351 if ($s->can_read(120)) {
1352 sysread($log_pipe_out, $arv_put_output, 1024);
1353 chomp($arv_put_output);
1355 Log (undef, "timed out reading from 'arv-put'");
1358 waitpid($log_pipe_pid, 0);
1359 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1361 Log("finish_output_log: arv-put returned error $?")
1364 return $arv_put_output;
1367 sub output_log_is_active() {
1368 return $log_pipe_pid
1371 sub Log # ($jobstep_id, $logmessage)
1373 if ($_[1] =~ /\n/) {
1374 for my $line (split (/\n/, $_[1])) {
1379 my $fh = select STDERR; $|=1; select $fh;
1380 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1381 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1384 if (output_log_is_active() || -t STDERR) {
1385 my @gmtime = gmtime;
1386 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1387 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1389 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1391 if (output_log_is_active()) {
1392 write_output_log($datetime . " " . $message);
1399 my ($package, $file, $line) = caller;
1400 my $message = "@_ at $file line $line\n";
1401 Log (undef, $message);
1402 freeze() if @jobstep_todo;
1403 collate_output() if @jobstep_todo;
1405 save_meta() if output_log_is_active();
1412 return if !$job_has_uuid;
1413 if ($Job->{'state'} eq 'Cancelled') {
1414 $Job->update_attributes('finished_at' => scalar gmtime);
1416 $Job->update_attributes('state' => 'Failed');
1423 my $justcheckpoint = shift; # false if this will be the last meta saved
1424 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1426 my $loglocator = finish_output_log();
1427 Log (undef, "log manifest is $loglocator");
1428 $Job->{'log'} = $loglocator;
1429 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1433 sub freeze_if_want_freeze
1435 if ($main::please_freeze)
1437 release_allocation();
1440 # kill some srun procs before freeze+stop
1441 map { $proc{$_} = {} } @_;
1444 killem (keys %proc);
1445 select (undef, undef, undef, 0.1);
1447 while (($died = waitpid (-1, WNOHANG)) > 0)
1449 delete $proc{$died};
1464 Log (undef, "Freeze not implemented");
1471 croak ("Thaw not implemented");
1487 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1494 my $srunargs = shift;
1495 my $execargs = shift;
1496 my $opts = shift || {};
1498 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1499 print STDERR (join (" ",
1500 map { / / ? "'$_'" : $_ }
1503 if $ENV{CRUNCH_DEBUG};
1505 if (defined $stdin) {
1506 my $child = open STDIN, "-|";
1507 defined $child or die "no fork: $!";
1509 print $stdin or die $!;
1510 close STDOUT or die $!;
1515 return system (@$args) if $opts->{fork};
1518 warn "ENV size is ".length(join(" ",%ENV));
1519 die "exec failed: $!: @$args";
1523 sub ban_node_by_slot {
1524 # Don't start any new jobsteps on this node for 60 seconds
1526 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1527 $slot[$slotid]->{node}->{hold_count}++;
1528 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1533 my ($lockfile, $error_message) = @_;
1534 open L, ">", $lockfile or croak("$lockfile: $!");
1535 if (!flock L, LOCK_EX|LOCK_NB) {
1536 croak("Can't lock $lockfile: $error_message\n");
1540 sub find_docker_image {
1541 # Given a Keep locator, check to see if it contains a Docker image.
1542 # If so, return its stream name and Docker hash.
1543 # If not, return undef for both values.
1544 my $locator = shift;
1545 my ($streamname, $filename);
1546 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1547 foreach my $line (split(/\n/, $image->{manifest_text})) {
1548 my @tokens = split(/\s+/, $line);
1550 $streamname = shift(@tokens);
1551 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1552 if (defined($filename)) {
1553 return (undef, undef); # More than one file in the Collection.
1555 $filename = (split(/:/, $filedata, 3))[2];
1560 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1561 return ($streamname, $1);
1563 return (undef, undef);
1567 sub put_retry_count {
1568 # Calculate a --retries argument for arv-put that will have it try
1569 # approximately as long as this Job has been running.
1570 my $stoptime = shift || time;
1571 my $starttime = $jobstep[0]->{starttime};
1572 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1574 while ($timediff >= 2) {
1578 return ($retries > 3) ? $retries : 3;
1584 # checkout-and-build
1587 use File::Path qw( make_path );
1589 my $destdir = $ENV{"CRUNCH_SRC"};
1590 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1591 my $repo = $ENV{"CRUNCH_SRC_URL"};
1592 my $task_work = $ENV{"TASK_WORK"};
1594 for my $dir ($destdir, $task_work) {
1597 -e $dir or die "Failed to create temporary directory ($dir): $!";
1601 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1603 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1606 die "Cannot exec `@ARGV`: $!";
1612 unlink "$destdir.commit";
1613 open STDOUT, ">", "$destdir.log";
1614 open STDERR, ">&STDOUT";
1617 my @git_archive_data = <DATA>;
1618 if (@git_archive_data) {
1619 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1620 print TARX @git_archive_data;
1622 die "'tar -C $destdir -xf -' exited $?: $!";
1627 chomp ($pwd = `pwd`);
1628 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1631 for my $src_path ("$destdir/arvados/sdk/python") {
1633 shell_or_die ("virtualenv", $install_dir);
1634 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1638 if (-e "$destdir/crunch_scripts/install") {
1639 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1640 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1642 shell_or_die ("./tests/autotests.sh", $install_dir);
1643 } elsif (-e "./install.sh") {
1644 shell_or_die ("./install.sh", $install_dir);
1648 unlink "$destdir.commit.new";
1649 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1650 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1657 die "Cannot exec `@ARGV`: $!";
1664 if ($ENV{"DEBUG"}) {
1665 print STDERR "@_\n";
1668 or die "@_ failed: $! exit 0x".sprintf("%x",$?);