2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
134 $main::ENV{CRUNCH_DEBUG} = 1;
138 $main::ENV{CRUNCH_DEBUG} = 0;
143 my $arv = Arvados->new('apiVersion' => 'v1');
146 my $User = $arv->{'users'}->{'current'}->execute;
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # Claim this job, and make sure nobody else does
158 # lock() sets is_locked_by_uuid and changes state to Running.
159 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
162 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
169 $Job = JSON::decode_json($jobspec);
173 map { croak ("No $_ specified") unless $Job->{$_} }
174 qw(script script_version script_parameters);
177 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178 $Job->{'started_at'} = gmtime;
180 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
184 $job_id = $Job->{'uuid'};
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194 Log (undef, "check slurm allocation");
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
201 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202 push @sinfo, "$localcpus localhost";
204 if (exists $ENV{SLURM_NODELIST})
206 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
210 my ($ncpus, $slurm_nodelist) = split;
211 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220 foreach (split (",", $ranges))
233 push @nodelist, map {
235 $n =~ s/\[[-,\d]+\]/$_/;
242 push @nodelist, $nodelist;
245 foreach my $nodename (@nodelist)
247 Log (undef, "node $nodename - $ncpus slots");
248 my $node = { name => $nodename,
252 foreach my $cpu (1..$ncpus)
254 push @slot, { node => $node,
258 push @node, @nodelist;
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269 $Job->update_attributes(
270 'tasks_summary' => { 'failed' => 0,
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 $main::please_refresh = 0;
288 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
290 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
291 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
292 $ENV{"JOB_UUID"} = $job_id;
296 my @jobstep_todo = ();
297 my @jobstep_done = ();
298 my @jobstep_tomerge = ();
299 my $jobstep_tomerge_level = 0;
301 my $squeue_kill_checked;
302 my $output_in_keep = 0;
303 my $latest_refresh = scalar time;
307 if (defined $Job->{thawedfromkey})
309 thaw ($Job->{thawedfromkey});
313 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
314 'job_uuid' => $Job->{'uuid'},
319 push @jobstep, { 'level' => 0,
321 'arvados_task' => $first_task,
323 push @jobstep_todo, 0;
329 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
338 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
341 if (!defined $no_clear_tmp) {
342 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
343 system($clear_tmp_cmd) == 0
344 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
346 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
347 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
349 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
350 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
351 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
353 or croak ("setup.py in $src_path failed: exit ".($?>>8));
361 $build_script = <DATA>;
363 Log (undef, "Install revision ".$Job->{script_version});
364 my $nodelist = join(",", @node);
366 if (!defined $no_clear_tmp) {
367 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
369 my $cleanpid = fork();
372 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
373 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
378 last if $cleanpid == waitpid (-1, WNOHANG);
379 freeze_if_want_freeze ($cleanpid);
380 select (undef, undef, undef, 0.1);
382 Log (undef, "Clean-work-dir exited $?");
385 # Install requested code version
388 my @srunargs = ("srun",
389 "--nodelist=$nodelist",
390 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
392 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
393 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
397 my $treeish = $Job->{'script_version'};
399 # If we're running under crunch-dispatch, it will have pulled the
400 # appropriate source tree into its own repository, and given us that
401 # repo's path as $git_dir. If we're running a "local" job, and a
402 # script_version was specified, it's up to the user to provide the
403 # full path to a local repository in Job->{repository}.
405 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
406 # git-archive --remote where appropriate.
408 # TODO: Accept a locally-hosted Arvados repository by name or
409 # UUID. Use arvados.v1.repositories.list or .get to figure out the
410 # appropriate fetch-url.
411 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
413 $ENV{"CRUNCH_SRC_URL"} = $repo;
415 if (-d "$repo/.git") {
416 # We were given a working directory, but we are only interested in
418 $repo = "$repo/.git";
421 # If this looks like a subversion r#, look for it in git-svn commit messages
423 if ($treeish =~ m{^\d{1,4}$}) {
424 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
426 Log(undef, "git Subversion search exited $?");
427 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
429 Log(undef, "Using commit $commit for Subversion revision $treeish");
433 # If that didn't work, try asking git to look it up as a tree-ish.
435 if (!defined $commit) {
436 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
438 Log(undef, "git rev-list exited $? with result '$found'");
439 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
441 Log(undef, "Using commit $commit for tree-ish $treeish");
442 if ($commit ne $treeish) {
443 # Make sure we record the real commit id in the database,
444 # frozentokey, logs, etc. -- instead of an abbreviation or a
445 # branch name which can become ambiguous or point to a
446 # different commit in the future.
447 $Job->{'script_version'} = $commit;
449 $Job->update_attributes('script_version' => $commit) or
450 croak("Error while updating job");
455 if (defined $commit) {
456 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
457 @execargs = ("sh", "-c",
458 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
459 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
460 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
463 croak ("could not figure out commit id for $treeish");
466 # Note: this section is almost certainly unnecessary if we're
467 # running tasks in docker containers.
468 my $installpid = fork();
469 if ($installpid == 0)
471 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
476 last if $installpid == waitpid (-1, WNOHANG);
477 freeze_if_want_freeze ($installpid);
478 select (undef, undef, undef, 0.1);
480 Log (undef, "Install exited $?");
485 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
486 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
489 # If this job requires a Docker image, install that.
490 my $docker_bin = "/usr/bin/docker.io";
491 my ($docker_locator, $docker_stream, $docker_hash);
492 if ($docker_locator = $Job->{docker_image_locator}) {
493 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
496 croak("No Docker image hash found from locator $docker_locator");
498 $docker_stream =~ s/^\.//;
499 my $docker_install_script = qq{
500 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
501 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
504 my $docker_pid = fork();
505 if ($docker_pid == 0)
507 srun (["srun", "--nodelist=" . join(',', @node)],
508 ["/bin/sh", "-ec", $docker_install_script]);
513 last if $docker_pid == waitpid (-1, WNOHANG);
514 freeze_if_want_freeze ($docker_pid);
515 select (undef, undef, undef, 0.1);
519 croak("Installing Docker image from $docker_locator returned exit code $?");
523 foreach (qw (script script_version script_parameters runtime_constraints))
527 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
529 foreach (split (/\n/, $Job->{knobs}))
531 Log (undef, "knob " . $_);
536 $main::success = undef;
542 my $thisround_succeeded = 0;
543 my $thisround_failed = 0;
544 my $thisround_failed_multiple = 0;
546 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
547 or $a <=> $b } @jobstep_todo;
548 my $level = $jobstep[$jobstep_todo[0]]->{level};
549 Log (undef, "start level $level");
554 my @freeslot = (0..$#slot);
557 my $progress_is_dirty = 1;
558 my $progress_stats_updated = 0;
560 update_progress_stats();
565 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
567 my $id = $jobstep_todo[$todo_ptr];
568 my $Jobstep = $jobstep[$id];
569 if ($Jobstep->{level} != $level)
574 pipe $reader{$id}, "writer" or croak ($!);
575 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
576 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
578 my $childslot = $freeslot[0];
579 my $childnode = $slot[$childslot]->{node};
580 my $childslotname = join (".",
581 $slot[$childslot]->{node}->{name},
582 $slot[$childslot]->{cpu});
583 my $childpid = fork();
586 $SIG{'INT'} = 'DEFAULT';
587 $SIG{'QUIT'} = 'DEFAULT';
588 $SIG{'TERM'} = 'DEFAULT';
590 foreach (values (%reader))
594 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
595 open(STDOUT,">&writer");
596 open(STDERR,">&writer");
601 delete $ENV{"GNUPGHOME"};
602 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
603 $ENV{"TASK_QSEQUENCE"} = $id;
604 $ENV{"TASK_SEQUENCE"} = $level;
605 $ENV{"JOB_SCRIPT"} = $Job->{script};
606 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
607 $param =~ tr/a-z/A-Z/;
608 $ENV{"JOB_PARAMETER_$param"} = $value;
610 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
611 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
612 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
613 $ENV{"HOME"} = $ENV{"TASK_WORK"};
614 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
615 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
616 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
617 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
623 "--nodelist=".$childnode->{name},
624 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
625 "--job-name=$job_id.$id.$$",
627 my $build_script_to_send = "";
629 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
630 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
631 ."&& cd $ENV{CRUNCH_TMP} ";
634 $build_script_to_send = $build_script;
638 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
641 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
642 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
643 # Dynamically configure the container to use the host system as its
644 # DNS server. Get the host's global addresses from the ip command,
645 # and turn them into docker --dns options using gawk.
647 q{$(ip -o address show scope global |
648 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
649 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
650 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
651 $command .= "--env=\QHOME=/home/crunch\E ";
652 while (my ($env_key, $env_val) = each %ENV)
654 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
655 if ($env_key eq "TASK_WORK") {
656 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
658 elsif ($env_key eq "TASK_KEEPMOUNT") {
659 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
662 $command .= "--env=\Q$env_key=$env_val\E ";
666 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
667 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
668 $command .= "\Q$docker_hash\E ";
669 $command .= "stdbuf --output=0 --error=0 ";
670 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
673 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
674 $command .= "stdbuf --output=0 --error=0 ";
675 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
678 my @execargs = ('bash', '-c', $command);
679 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
680 # exec() failed, we assume nothing happened.
681 Log(undef, "srun() failed on build script");
685 if (!defined $childpid)
692 $proc{$childpid} = { jobstep => $id,
695 jobstepname => "$job_id.$id.$childpid",
697 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
698 $slot[$childslot]->{pid} = $childpid;
700 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
701 Log ($id, "child $childpid started on $childslotname");
702 $Jobstep->{starttime} = time;
703 $Jobstep->{node} = $childnode->{name};
704 $Jobstep->{slotindex} = $childslot;
705 delete $Jobstep->{stderr};
706 delete $Jobstep->{finishtime};
708 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
709 $Jobstep->{'arvados_task'}->save;
711 splice @jobstep_todo, $todo_ptr, 1;
714 $progress_is_dirty = 1;
718 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
720 last THISROUND if $main::please_freeze;
721 if ($main::please_info)
723 $main::please_info = 0;
727 update_progress_stats();
734 check_refresh_wanted();
736 update_progress_stats();
737 select (undef, undef, undef, 0.1);
739 elsif (time - $progress_stats_updated >= 30)
741 update_progress_stats();
743 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
746 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747 .($thisround_failed+$thisround_succeeded)
748 .") -- giving up on this round";
749 Log (undef, $message);
753 # move slots from freeslot to holdslot (or back to freeslot) if necessary
754 for (my $i=$#freeslot; $i>=0; $i--) {
755 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756 push @holdslot, (splice @freeslot, $i, 1);
759 for (my $i=$#holdslot; $i>=0; $i--) {
760 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761 push @freeslot, (splice @holdslot, $i, 1);
765 # give up if no nodes are succeeding
766 if (!grep { $_->{node}->{losing_streak} == 0 &&
767 $_->{node}->{hold_count} < 4 } @slot) {
768 my $message = "Every node has failed -- giving up on this round";
769 Log (undef, $message);
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
783 if ($main::please_continue) {
784 $main::please_continue = 0;
787 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
791 check_refresh_wanted();
793 update_progress_stats();
794 select (undef, undef, undef, 0.1);
795 killem (keys %proc) if $main::please_freeze;
799 update_progress_stats();
800 freeze_if_want_freeze();
803 if (!defined $main::success)
806 $thisround_succeeded == 0 &&
807 ($thisround_failed == 0 || $thisround_failed > 4))
809 my $message = "stop because $thisround_failed tasks failed and none succeeded";
810 Log (undef, $message);
819 goto ONELEVEL if !defined $main::success;
822 release_allocation();
824 my $collated_output = &collate_output();
826 if (!$collated_output) {
827 Log(undef, "output undef");
831 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
832 or die "failed to get collated manifest: $!";
833 my $orig_manifest_text = '';
834 while (my $manifest_line = <$orig_manifest>) {
835 $orig_manifest_text .= $manifest_line;
837 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
838 'manifest_text' => $orig_manifest_text,
840 Log(undef, "output uuid " . $output->{uuid});
841 Log(undef, "output hash " . $output->{portable_data_hash});
842 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
845 Log (undef, "Failed to register output manifest: $@");
849 Log (undef, "finish");
854 if ($collated_output && $main::success) {
855 $Job->update_attributes('state' => 'Complete')
857 $Job->update_attributes('state' => 'Failed')
861 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
865 sub update_progress_stats
867 $progress_stats_updated = time;
868 return if !$progress_is_dirty;
869 my ($todo, $done, $running) = (scalar @jobstep_todo,
870 scalar @jobstep_done,
871 scalar @slot - scalar @freeslot - scalar @holdslot);
872 $Job->{'tasks_summary'} ||= {};
873 $Job->{'tasks_summary'}->{'todo'} = $todo;
874 $Job->{'tasks_summary'}->{'done'} = $done;
875 $Job->{'tasks_summary'}->{'running'} = $running;
877 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
879 Log (undef, "status: $done done, $running running, $todo todo");
880 $progress_is_dirty = 0;
887 my $pid = waitpid (-1, WNOHANG);
888 return 0 if $pid <= 0;
890 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
892 . $slot[$proc{$pid}->{slot}]->{cpu});
893 my $jobstepid = $proc{$pid}->{jobstep};
894 my $elapsed = time - $proc{$pid}->{time};
895 my $Jobstep = $jobstep[$jobstepid];
897 my $childstatus = $?;
898 my $exitvalue = $childstatus >> 8;
899 my $exitinfo = sprintf("exit %d signal %d%s",
902 ($childstatus & 128 ? ' core dump' : ''));
903 $Jobstep->{'arvados_task'}->reload;
904 my $task_success = $Jobstep->{'arvados_task'}->{success};
906 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
908 if (!defined $task_success) {
909 # task did not indicate one way or the other --> fail
910 $Jobstep->{'arvados_task'}->{success} = 0;
911 $Jobstep->{'arvados_task'}->save;
918 $temporary_fail ||= $Jobstep->{node_fail};
919 $temporary_fail ||= ($exitvalue == 111);
922 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
924 # Check for signs of a failed or misconfigured node
925 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
926 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
927 # Don't count this against jobstep failure thresholds if this
928 # node is already suspected faulty and srun exited quickly
929 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
931 Log ($jobstepid, "blaming failure on suspect node " .
932 $slot[$proc{$pid}->{slot}]->{node}->{name});
933 $temporary_fail ||= 1;
935 ban_node_by_slot($proc{$pid}->{slot});
938 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
939 ++$Jobstep->{'failures'},
940 $temporary_fail ? 'temporary ' : 'permanent',
943 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
944 # Give up on this task, and the whole job
946 $main::please_freeze = 1;
949 # Put this task back on the todo queue
950 push @jobstep_todo, $jobstepid;
952 $Job->{'tasks_summary'}->{'failed'}++;
956 ++$thisround_succeeded;
957 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
958 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
959 push @jobstep_done, $jobstepid;
960 Log ($jobstepid, "success in $elapsed seconds");
962 $Jobstep->{exitcode} = $childstatus;
963 $Jobstep->{finishtime} = time;
964 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
965 $Jobstep->{'arvados_task'}->save;
966 process_stderr ($jobstepid, $task_success);
967 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
969 close $reader{$jobstepid};
970 delete $reader{$jobstepid};
971 delete $slot[$proc{$pid}->{slot}]->{pid};
972 push @freeslot, $proc{$pid}->{slot};
977 my $newtask_list = [];
980 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
982 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
984 'order' => 'qsequence',
985 'offset' => scalar(@$newtask_list),
987 push(@$newtask_list, @{$newtask_results->{items}});
988 } while (@{$newtask_results->{items}});
989 foreach my $arvados_task (@$newtask_list) {
991 'level' => $arvados_task->{'sequence'},
993 'arvados_task' => $arvados_task
995 push @jobstep, $jobstep;
996 push @jobstep_todo, $#jobstep;
1000 $progress_is_dirty = 1;
1004 sub check_refresh_wanted
1006 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1007 if (@stat && $stat[9] > $latest_refresh) {
1008 $latest_refresh = scalar time;
1009 if ($job_has_uuid) {
1010 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1011 for my $attr ('cancelled_at',
1012 'cancelled_by_user_uuid',
1013 'cancelled_by_client_uuid',
1015 $Job->{$attr} = $Job2->{$attr};
1017 if ($Job->{'state'} ne "Running") {
1018 if ($Job->{'state'} eq "Cancelled") {
1019 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1021 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1024 $main::please_freeze = 1;
1032 # return if the kill list was checked <4 seconds ago
1033 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1037 $squeue_kill_checked = time;
1039 # use killem() on procs whose killtime is reached
1042 if (exists $proc{$_}->{killtime}
1043 && $proc{$_}->{killtime} <= time)
1049 # return if the squeue was checked <60 seconds ago
1050 if (defined $squeue_checked && $squeue_checked > time - 60)
1054 $squeue_checked = time;
1058 # here is an opportunity to check for mysterious problems with local procs
1062 # get a list of steps still running
1063 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1065 if ($squeue[-1] ne "ok")
1071 # which of my jobsteps are running, according to squeue?
1075 if (/^(\d+)\.(\d+) (\S+)/)
1077 if ($1 eq $ENV{SLURM_JOBID})
1084 # which of my active child procs (>60s old) were not mentioned by squeue?
1085 foreach (keys %proc)
1087 if ($proc{$_}->{time} < time - 60
1088 && !exists $ok{$proc{$_}->{jobstepname}}
1089 && !exists $proc{$_}->{killtime})
1091 # kill this proc if it hasn't exited in 30 seconds
1092 $proc{$_}->{killtime} = time + 30;
1098 sub release_allocation
1102 Log (undef, "release job allocation");
1103 system "scancel $ENV{SLURM_JOBID}";
1111 foreach my $job (keys %reader)
1114 while (0 < sysread ($reader{$job}, $buf, 8192))
1116 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1117 $jobstep[$job]->{stderr} .= $buf;
1118 preprocess_stderr ($job);
1119 if (length ($jobstep[$job]->{stderr}) > 16384)
1121 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1130 sub preprocess_stderr
1134 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1136 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1137 Log ($job, "stderr $line");
1138 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1140 $main::please_freeze = 1;
1142 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1143 $jobstep[$job]->{node_fail} = 1;
1144 ban_node_by_slot($jobstep[$job]->{slotindex});
1153 my $task_success = shift;
1154 preprocess_stderr ($job);
1157 Log ($job, "stderr $_");
1158 } split ("\n", $jobstep[$job]->{stderr});
1164 my ($keep, $child_out, $output_block);
1166 my $cmd = "arv-get \Q$hash\E";
1167 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1171 my $bytes = sysread($keep, $buf, 1024 * 1024);
1172 if (!defined $bytes) {
1173 die "reading from arv-get: $!";
1174 } elsif ($bytes == 0) {
1175 # sysread returns 0 at the end of the pipe.
1178 # some bytes were read into buf.
1179 $output_block .= $buf;
1183 return $output_block;
1188 Log (undef, "collate");
1190 my ($child_out, $child_in);
1191 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1192 '--retries', put_retry_count());
1196 next if (!exists $_->{'arvados_task'}->{'output'} ||
1197 !$_->{'arvados_task'}->{'success'});
1198 my $output = $_->{'arvados_task'}->{output};
1199 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1201 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1202 print $child_in $output;
1204 elsif (@jobstep == 1)
1206 $joboutput = $output;
1209 elsif (defined (my $outblock = fetch_block ($output)))
1211 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1212 print $child_in $outblock;
1216 Log (undef, "XXX fetch_block($output) failed XXX");
1222 if (!defined $joboutput) {
1223 my $s = IO::Select->new($child_out);
1224 if ($s->can_read(120)) {
1225 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1228 Log (undef, "timed out reading from 'arv-put'");
1241 my $sig = 2; # SIGINT first
1242 if (exists $proc{$_}->{"sent_$sig"} &&
1243 time - $proc{$_}->{"sent_$sig"} > 4)
1245 $sig = 15; # SIGTERM if SIGINT doesn't work
1247 if (exists $proc{$_}->{"sent_$sig"} &&
1248 time - $proc{$_}->{"sent_$sig"} > 4)
1250 $sig = 9; # SIGKILL if SIGTERM doesn't work
1252 if (!exists $proc{$_}->{"sent_$sig"})
1254 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1256 select (undef, undef, undef, 0.1);
1259 kill $sig, $_; # srun wants two SIGINT to really interrupt
1261 $proc{$_}->{"sent_$sig"} = time;
1262 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1272 vec($bits,fileno($_),1) = 1;
1278 sub Log # ($jobstep_id, $logmessage)
1280 if ($_[1] =~ /\n/) {
1281 for my $line (split (/\n/, $_[1])) {
1286 my $fh = select STDERR; $|=1; select $fh;
1287 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1288 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1291 if ($local_logfile || -t STDERR) {
1292 my @gmtime = gmtime;
1293 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1294 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1296 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1298 if ($local_logfile) {
1299 print $local_logfile $datetime . " " . $message;
1306 my ($package, $file, $line) = caller;
1307 my $message = "@_ at $file line $line\n";
1308 Log (undef, $message);
1309 freeze() if @jobstep_todo;
1310 collate_output() if @jobstep_todo;
1312 save_meta() if $local_logfile;
1319 return if !$job_has_uuid;
1320 if ($Job->{'state'} eq 'Cancelled') {
1321 $Job->update_attributes('finished_at' => scalar gmtime);
1323 $Job->update_attributes('state' => 'Failed');
1330 my $justcheckpoint = shift; # false if this will be the last meta saved
1331 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1333 $local_logfile->flush;
1334 my $retry_count = put_retry_count();
1335 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1336 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1337 my $loglocator = `$cmd`;
1338 die "system $cmd failed: $?" if $?;
1341 $local_logfile = undef; # the temp file is automatically deleted
1342 Log (undef, "log manifest is $loglocator");
1343 $Job->{'log'} = $loglocator;
1344 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1348 sub freeze_if_want_freeze
1350 if ($main::please_freeze)
1352 release_allocation();
1355 # kill some srun procs before freeze+stop
1356 map { $proc{$_} = {} } @_;
1359 killem (keys %proc);
1360 select (undef, undef, undef, 0.1);
1362 while (($died = waitpid (-1, WNOHANG)) > 0)
1364 delete $proc{$died};
1379 Log (undef, "Freeze not implemented");
1386 croak ("Thaw not implemented");
1402 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1409 my $srunargs = shift;
1410 my $execargs = shift;
1411 my $opts = shift || {};
1413 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1414 print STDERR (join (" ",
1415 map { / / ? "'$_'" : $_ }
1418 if $ENV{CRUNCH_DEBUG};
1420 if (defined $stdin) {
1421 my $child = open STDIN, "-|";
1422 defined $child or die "no fork: $!";
1424 print $stdin or die $!;
1425 close STDOUT or die $!;
1430 return system (@$args) if $opts->{fork};
1433 warn "ENV size is ".length(join(" ",%ENV));
1434 die "exec failed: $!: @$args";
1438 sub ban_node_by_slot {
1439 # Don't start any new jobsteps on this node for 60 seconds
1441 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1442 $slot[$slotid]->{node}->{hold_count}++;
1443 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1448 my ($lockfile, $error_message) = @_;
1449 open L, ">", $lockfile or croak("$lockfile: $!");
1450 if (!flock L, LOCK_EX|LOCK_NB) {
1451 croak("Can't lock $lockfile: $error_message\n");
1455 sub find_docker_image {
1456 # Given a Keep locator, check to see if it contains a Docker image.
1457 # If so, return its stream name and Docker hash.
1458 # If not, return undef for both values.
1459 my $locator = shift;
1460 my ($streamname, $filename);
1461 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1462 foreach my $line (split(/\n/, $image->{manifest_text})) {
1463 my @tokens = split(/\s+/, $line);
1465 $streamname = shift(@tokens);
1466 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1467 if (defined($filename)) {
1468 return (undef, undef); # More than one file in the Collection.
1470 $filename = (split(/:/, $filedata, 3))[2];
1475 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1476 return ($streamname, $1);
1478 return (undef, undef);
1482 sub put_retry_count {
1483 # Calculate a --retries argument for arv-put that will have it try
1484 # approximately as long as this Job has been running.
1485 my $stoptime = shift || time;
1486 my $starttime = $jobstep[0]->{starttime};
1487 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1489 while ($timediff >= 2) {
1493 return ($retries > 3) ? $retries : 3;
1499 # checkout-and-build
1502 use File::Path qw( make_path );
1504 my $destdir = $ENV{"CRUNCH_SRC"};
1505 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1506 my $repo = $ENV{"CRUNCH_SRC_URL"};
1507 my $task_work = $ENV{"TASK_WORK"};
1509 for my $dir ($destdir, $task_work) {
1512 -e $dir or die "Failed to create temporary directory ($dir): $!";
1516 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1518 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1521 die "Cannot exec `@ARGV`: $!";
1527 unlink "$destdir.commit";
1528 open STDOUT, ">", "$destdir.log";
1529 open STDERR, ">&STDOUT";
1532 my @git_archive_data = <DATA>;
1533 if (@git_archive_data) {
1534 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1535 print TARX @git_archive_data;
1537 die "'tar -C $destdir -xf -' exited $?: $!";
1542 chomp ($pwd = `pwd`);
1543 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1546 for my $src_path ("$destdir/arvados/sdk/python") {
1548 shell_or_die ("virtualenv", $install_dir);
1549 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1553 if (-e "$destdir/crunch_scripts/install") {
1554 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1555 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1557 shell_or_die ("./tests/autotests.sh", $install_dir);
1558 } elsif (-e "./install.sh") {
1559 shell_or_die ("./install.sh", $install_dir);
1563 unlink "$destdir.commit.new";
1564 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1565 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1572 die "Cannot exec `@ARGV`: $!";
1579 if ($ENV{"DEBUG"}) {
1580 print STDERR "@_\n";
1583 or die "@_ failed: $! exit 0x".sprintf("%x",$?);