2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
85 use File::Path qw( make_path );
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90 if ($ENV{"USER"} ne "crunch" && $< != 0) {
91 # use a tmp dir unique for my uid
92 $ENV{"CRUNCH_TMP"} .= "-$<";
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
112 GetOptions('force-unlock' => \$force_unlock,
113 'git-dir=s' => \$git_dir,
114 'job=s' => \$jobspec,
115 'job-api-token=s' => \$job_api_token,
116 'no-clear-tmp' => \$no_clear_tmp,
117 'resume-stash=s' => \$resume_stash,
120 if (defined $job_api_token) {
121 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
131 $main::ENV{CRUNCH_DEBUG} = 1;
135 $main::ENV{CRUNCH_DEBUG} = 0;
140 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $User = $arv->{'users'}->{'current'}->execute;
151 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152 if (!$force_unlock) {
153 # If some other crunch-job process has grabbed this job (or we see
154 # other evidence that the job is already underway) we exit 111 so
155 # crunch-dispatch (our parent process) doesn't mark the job as
157 if ($Job->{'is_locked_by_uuid'}) {
158 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'} . ", exiting 111");
161 if ($Job->{'success'} ne undef) {
162 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165 if ($Job->{'running'}) {
166 Log(undef, "Job 'running' flag is already set");
169 if ($Job->{'started_at'}) {
170 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
188 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
192 $job_id = $Job->{'uuid'};
194 my $keep_logfile = $job_id . '.log.txt';
195 $local_logfile = File::Temp->new();
197 $Job->{'runtime_constraints'} ||= {};
198 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
199 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202 Log (undef, "check slurm allocation");
205 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
209 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
210 push @sinfo, "$localcpus localhost";
212 if (exists $ENV{SLURM_NODELIST})
214 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
218 my ($ncpus, $slurm_nodelist) = split;
219 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228 foreach (split (",", $ranges))
241 push @nodelist, map {
243 $n =~ s/\[[-,\d]+\]/$_/;
250 push @nodelist, $nodelist;
253 foreach my $nodename (@nodelist)
255 Log (undef, "node $nodename - $ncpus slots");
256 my $node = { name => $nodename,
260 foreach my $cpu (1..$ncpus)
262 push @slot, { node => $node,
266 push @node, @nodelist;
271 # Ensure that we get one jobstep running on each allocated node before
272 # we start overloading nodes with concurrent steps
274 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
281 # Claim this job, and make sure nobody else does
282 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
283 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
284 Log(undef, "Error while updating / locking job, exiting 111");
287 $Job->update_attributes('started_at' => scalar gmtime,
290 'tasks_summary' => { 'failed' => 0,
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
329 if (defined $Job->{thawedfromkey})
331 thaw ($Job->{thawedfromkey});
335 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336 'job_uuid' => $Job->{'uuid'},
341 push @jobstep, { 'level' => 0,
343 'arvados_task' => $first_task,
345 push @jobstep_todo, 0;
351 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
358 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if (!defined $no_clear_tmp) {
364 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
365 system($clear_tmp_cmd) == 0
366 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
368 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
369 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
371 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
372 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
373 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
375 or croak ("setup.py in $src_path failed: exit ".($?>>8));
383 $build_script = <DATA>;
385 Log (undef, "Install revision ".$Job->{script_version});
386 my $nodelist = join(",", @node);
388 if (!defined $no_clear_tmp) {
389 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
391 my $cleanpid = fork();
394 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
395 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
400 last if $cleanpid == waitpid (-1, WNOHANG);
401 freeze_if_want_freeze ($cleanpid);
402 select (undef, undef, undef, 0.1);
404 Log (undef, "Clean-work-dir exited $?");
407 # Install requested code version
410 my @srunargs = ("srun",
411 "--nodelist=$nodelist",
412 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
414 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
415 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
419 my $treeish = $Job->{'script_version'};
421 # If we're running under crunch-dispatch, it will have pulled the
422 # appropriate source tree into its own repository, and given us that
423 # repo's path as $git_dir. If we're running a "local" job, and a
424 # script_version was specified, it's up to the user to provide the
425 # full path to a local repository in Job->{repository}.
427 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
428 # git-archive --remote where appropriate.
430 # TODO: Accept a locally-hosted Arvados repository by name or
431 # UUID. Use arvados.v1.repositories.list or .get to figure out the
432 # appropriate fetch-url.
433 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
435 $ENV{"CRUNCH_SRC_URL"} = $repo;
437 if (-d "$repo/.git") {
438 # We were given a working directory, but we are only interested in
440 $repo = "$repo/.git";
443 # If this looks like a subversion r#, look for it in git-svn commit messages
445 if ($treeish =~ m{^\d{1,4}$}) {
446 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
448 if ($gitlog =~ /^[a-f0-9]{40}$/) {
450 Log (undef, "Using commit $commit for script_version $treeish");
454 # If that didn't work, try asking git to look it up as a tree-ish.
456 if (!defined $commit) {
457 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
459 if ($found =~ /^[0-9a-f]{40}$/s) {
461 if ($commit ne $treeish) {
462 # Make sure we record the real commit id in the database,
463 # frozentokey, logs, etc. -- instead of an abbreviation or a
464 # branch name which can become ambiguous or point to a
465 # different commit in the future.
466 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
467 Log (undef, "Using commit $commit for tree-ish $treeish");
468 if ($commit ne $treeish) {
469 $Job->{'script_version'} = $commit;
471 $Job->update_attributes('script_version' => $commit) or
472 croak("Error while updating job");
478 if (defined $commit) {
479 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
480 @execargs = ("sh", "-c",
481 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
482 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485 croak ("could not figure out commit id for $treeish");
488 my $installpid = fork();
489 if ($installpid == 0)
491 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
496 last if $installpid == waitpid (-1, WNOHANG);
497 freeze_if_want_freeze ($installpid);
498 select (undef, undef, undef, 0.1);
500 Log (undef, "Install exited $?");
505 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
506 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 # If this job requires a Docker image, install that.
510 my $docker_bin = "/usr/bin/docker.io";
511 my ($docker_locator, $docker_stream, $docker_hash);
512 if ($docker_locator = $Job->{docker_image_locator}) {
513 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516 croak("No Docker image hash found from locator $docker_locator");
518 $docker_stream =~ s/^\.//;
519 my $docker_install_script = qq{
520 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
521 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 my $docker_pid = fork();
525 if ($docker_pid == 0)
527 srun (["srun", "--nodelist=" . join(',', @node)],
528 ["/bin/sh", "-ec", $docker_install_script]);
533 last if $docker_pid == waitpid (-1, WNOHANG);
534 freeze_if_want_freeze ($docker_pid);
535 select (undef, undef, undef, 0.1);
539 croak("Installing Docker image from $docker_locator returned exit code $?");
543 foreach (qw (script script_version script_parameters runtime_constraints))
547 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
549 foreach (split (/\n/, $Job->{knobs}))
551 Log (undef, "knob " . $_);
556 $main::success = undef;
562 my $thisround_succeeded = 0;
563 my $thisround_failed = 0;
564 my $thisround_failed_multiple = 0;
566 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
567 or $a <=> $b } @jobstep_todo;
568 my $level = $jobstep[$jobstep_todo[0]]->{level};
569 Log (undef, "start level $level");
574 my @freeslot = (0..$#slot);
577 my $progress_is_dirty = 1;
578 my $progress_stats_updated = 0;
580 update_progress_stats();
585 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
587 my $id = $jobstep_todo[$todo_ptr];
588 my $Jobstep = $jobstep[$id];
589 if ($Jobstep->{level} != $level)
594 pipe $reader{$id}, "writer" or croak ($!);
595 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
596 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
598 my $childslot = $freeslot[0];
599 my $childnode = $slot[$childslot]->{node};
600 my $childslotname = join (".",
601 $slot[$childslot]->{node}->{name},
602 $slot[$childslot]->{cpu});
603 my $childpid = fork();
606 $SIG{'INT'} = 'DEFAULT';
607 $SIG{'QUIT'} = 'DEFAULT';
608 $SIG{'TERM'} = 'DEFAULT';
610 foreach (values (%reader))
614 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
615 open(STDOUT,">&writer");
616 open(STDERR,">&writer");
621 delete $ENV{"GNUPGHOME"};
622 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
623 $ENV{"TASK_QSEQUENCE"} = $id;
624 $ENV{"TASK_SEQUENCE"} = $level;
625 $ENV{"JOB_SCRIPT"} = $Job->{script};
626 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
627 $param =~ tr/a-z/A-Z/;
628 $ENV{"JOB_PARAMETER_$param"} = $value;
630 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
631 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
632 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
633 $ENV{"HOME"} = $ENV{"TASK_WORK"};
634 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
635 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
636 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
637 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
643 "--nodelist=".$childnode->{name},
644 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
645 "--job-name=$job_id.$id.$$",
647 my $build_script_to_send = "";
649 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
650 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
651 ."&& cd $ENV{CRUNCH_TMP} ";
654 $build_script_to_send = $build_script;
658 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
662 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
663 # Dynamically configure the container to use the host system as its
664 # DNS server. Get the host's global addresses from the ip command,
665 # and turn them into docker --dns options using gawk.
667 q{$(ip -o address show scope global |
668 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
669 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
670 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
671 $command .= "--env=\QHOME=/home/crunch\E ";
672 while (my ($env_key, $env_val) = each %ENV)
674 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
675 if ($env_key eq "TASK_WORK") {
676 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
678 elsif ($env_key eq "TASK_KEEPMOUNT") {
679 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
682 $command .= "--env=\Q$env_key=$env_val\E ";
686 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
687 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
688 $command .= "\Q$docker_hash\E ";
689 $command .= "stdbuf --output=0 --error=0 ";
690 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
693 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
694 $command .= "stdbuf --output=0 --error=0 ";
695 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
698 my @execargs = ('bash', '-c', $command);
699 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
700 # exec() failed, we assume nothing happened.
701 Log(undef, "srun() failed on build script");
705 if (!defined $childpid)
712 $proc{$childpid} = { jobstep => $id,
715 jobstepname => "$job_id.$id.$childpid",
717 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
718 $slot[$childslot]->{pid} = $childpid;
720 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
721 Log ($id, "child $childpid started on $childslotname");
722 $Jobstep->{starttime} = time;
723 $Jobstep->{node} = $childnode->{name};
724 $Jobstep->{slotindex} = $childslot;
725 delete $Jobstep->{stderr};
726 delete $Jobstep->{finishtime};
728 splice @jobstep_todo, $todo_ptr, 1;
731 $progress_is_dirty = 1;
735 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
737 last THISROUND if $main::please_freeze;
738 if ($main::please_info)
740 $main::please_info = 0;
744 update_progress_stats();
751 check_refresh_wanted();
753 update_progress_stats();
754 select (undef, undef, undef, 0.1);
756 elsif (time - $progress_stats_updated >= 30)
758 update_progress_stats();
760 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
761 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
763 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
764 .($thisround_failed+$thisround_succeeded)
765 .") -- giving up on this round";
766 Log (undef, $message);
770 # move slots from freeslot to holdslot (or back to freeslot) if necessary
771 for (my $i=$#freeslot; $i>=0; $i--) {
772 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
773 push @holdslot, (splice @freeslot, $i, 1);
776 for (my $i=$#holdslot; $i>=0; $i--) {
777 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
778 push @freeslot, (splice @holdslot, $i, 1);
782 # give up if no nodes are succeeding
783 if (!grep { $_->{node}->{losing_streak} == 0 &&
784 $_->{node}->{hold_count} < 4 } @slot) {
785 my $message = "Every node has failed -- giving up on this round";
786 Log (undef, $message);
793 push @freeslot, splice @holdslot;
794 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
797 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
800 if ($main::please_continue) {
801 $main::please_continue = 0;
804 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
808 check_refresh_wanted();
810 update_progress_stats();
811 select (undef, undef, undef, 0.1);
812 killem (keys %proc) if $main::please_freeze;
816 update_progress_stats();
817 freeze_if_want_freeze();
820 if (!defined $main::success)
823 $thisround_succeeded == 0 &&
824 ($thisround_failed == 0 || $thisround_failed > 4))
826 my $message = "stop because $thisround_failed tasks failed and none succeeded";
827 Log (undef, $message);
836 goto ONELEVEL if !defined $main::success;
839 release_allocation();
841 my $collated_output = &collate_output();
844 $Job->update_attributes('running' => 0,
845 'success' => $collated_output && $main::success,
846 'finished_at' => scalar gmtime)
849 if (!$collated_output) {
850 Log(undef, "output undef");
854 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
855 or die "failed to get collated manifest: $!";
856 # Read the original manifest, and strip permission hints from it,
857 # so we can put the result in a Collection.
858 my @stripped_manifest_lines = ();
859 my $orig_manifest_text = '';
860 while (my $manifest_line = <$orig_manifest>) {
861 $orig_manifest_text .= $manifest_line;
862 my @words = split(/ /, $manifest_line, -1);
863 foreach my $ii (0..$#words) {
864 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
865 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
868 push(@stripped_manifest_lines, join(" ", @words));
870 my $stripped_manifest_text = join("", @stripped_manifest_lines);
871 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
872 'uuid' => md5_hex($stripped_manifest_text),
873 'manifest_text' => $orig_manifest_text,
875 Log(undef, "output " . $output->{uuid});
876 $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
877 if ($Job->{'output_is_persistent'}) {
878 $arv->{'links'}->{'create'}->execute('link' => {
879 'tail_kind' => 'arvados#user',
880 'tail_uuid' => $User->{'uuid'},
881 'head_kind' => 'arvados#collection',
882 'head_uuid' => $Job->{'output'},
883 'link_class' => 'resources',
889 Log (undef, "Failed to register output manifest: $@");
893 Log (undef, "finish");
900 sub update_progress_stats
902 $progress_stats_updated = time;
903 return if !$progress_is_dirty;
904 my ($todo, $done, $running) = (scalar @jobstep_todo,
905 scalar @jobstep_done,
906 scalar @slot - scalar @freeslot - scalar @holdslot);
907 $Job->{'tasks_summary'} ||= {};
908 $Job->{'tasks_summary'}->{'todo'} = $todo;
909 $Job->{'tasks_summary'}->{'done'} = $done;
910 $Job->{'tasks_summary'}->{'running'} = $running;
912 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
914 Log (undef, "status: $done done, $running running, $todo todo");
915 $progress_is_dirty = 0;
922 my $pid = waitpid (-1, WNOHANG);
923 return 0 if $pid <= 0;
925 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
927 . $slot[$proc{$pid}->{slot}]->{cpu});
928 my $jobstepid = $proc{$pid}->{jobstep};
929 my $elapsed = time - $proc{$pid}->{time};
930 my $Jobstep = $jobstep[$jobstepid];
932 my $childstatus = $?;
933 my $exitvalue = $childstatus >> 8;
934 my $exitinfo = sprintf("exit %d signal %d%s",
937 ($childstatus & 128 ? ' core dump' : ''));
938 $Jobstep->{'arvados_task'}->reload;
939 my $task_success = $Jobstep->{'arvados_task'}->{success};
941 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
943 if (!defined $task_success) {
944 # task did not indicate one way or the other --> fail
945 $Jobstep->{'arvados_task'}->{success} = 0;
946 $Jobstep->{'arvados_task'}->save;
953 $temporary_fail ||= $Jobstep->{node_fail};
954 $temporary_fail ||= ($exitvalue == 111);
957 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
959 # Check for signs of a failed or misconfigured node
960 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
961 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
962 # Don't count this against jobstep failure thresholds if this
963 # node is already suspected faulty and srun exited quickly
964 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
966 Log ($jobstepid, "blaming failure on suspect node " .
967 $slot[$proc{$pid}->{slot}]->{node}->{name});
968 $temporary_fail ||= 1;
970 ban_node_by_slot($proc{$pid}->{slot});
973 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
974 ++$Jobstep->{'failures'},
975 $temporary_fail ? 'temporary ' : 'permanent',
978 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
979 # Give up on this task, and the whole job
981 $main::please_freeze = 1;
984 # Put this task back on the todo queue
985 push @jobstep_todo, $jobstepid;
987 $Job->{'tasks_summary'}->{'failed'}++;
991 ++$thisround_succeeded;
992 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
993 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
994 push @jobstep_done, $jobstepid;
995 Log ($jobstepid, "success in $elapsed seconds");
997 $Jobstep->{exitcode} = $childstatus;
998 $Jobstep->{finishtime} = time;
999 process_stderr ($jobstepid, $task_success);
1000 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1002 close $reader{$jobstepid};
1003 delete $reader{$jobstepid};
1004 delete $slot[$proc{$pid}->{slot}]->{pid};
1005 push @freeslot, $proc{$pid}->{slot};
1008 if ($task_success) {
1010 my $newtask_list = [];
1011 my $newtask_results;
1013 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1015 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1017 'order' => 'qsequence',
1018 'offset' => scalar(@$newtask_list),
1020 push(@$newtask_list, @{$newtask_results->{items}});
1021 } while (@{$newtask_results->{items}});
1022 foreach my $arvados_task (@$newtask_list) {
1024 'level' => $arvados_task->{'sequence'},
1026 'arvados_task' => $arvados_task
1028 push @jobstep, $jobstep;
1029 push @jobstep_todo, $#jobstep;
1033 $progress_is_dirty = 1;
1037 sub check_refresh_wanted
1039 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1040 if (@stat && $stat[9] > $latest_refresh) {
1041 $latest_refresh = scalar time;
1042 if ($job_has_uuid) {
1043 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1044 for my $attr ('cancelled_at',
1045 'cancelled_by_user_uuid',
1046 'cancelled_by_client_uuid') {
1047 $Job->{$attr} = $Job2->{$attr};
1049 if ($Job->{'cancelled_at'}) {
1050 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1051 " by user " . $Job->{cancelled_by_user_uuid});
1053 $main::please_freeze = 1;
1061 # return if the kill list was checked <4 seconds ago
1062 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1066 $squeue_kill_checked = time;
1068 # use killem() on procs whose killtime is reached
1071 if (exists $proc{$_}->{killtime}
1072 && $proc{$_}->{killtime} <= time)
1078 # return if the squeue was checked <60 seconds ago
1079 if (defined $squeue_checked && $squeue_checked > time - 60)
1083 $squeue_checked = time;
1087 # here is an opportunity to check for mysterious problems with local procs
1091 # get a list of steps still running
1092 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1094 if ($squeue[-1] ne "ok")
1100 # which of my jobsteps are running, according to squeue?
1104 if (/^(\d+)\.(\d+) (\S+)/)
1106 if ($1 eq $ENV{SLURM_JOBID})
1113 # which of my active child procs (>60s old) were not mentioned by squeue?
1114 foreach (keys %proc)
1116 if ($proc{$_}->{time} < time - 60
1117 && !exists $ok{$proc{$_}->{jobstepname}}
1118 && !exists $proc{$_}->{killtime})
1120 # kill this proc if it hasn't exited in 30 seconds
1121 $proc{$_}->{killtime} = time + 30;
1127 sub release_allocation
1131 Log (undef, "release job allocation");
1132 system "scancel $ENV{SLURM_JOBID}";
1140 foreach my $job (keys %reader)
1143 while (0 < sysread ($reader{$job}, $buf, 8192))
1145 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1146 $jobstep[$job]->{stderr} .= $buf;
1147 preprocess_stderr ($job);
1148 if (length ($jobstep[$job]->{stderr}) > 16384)
1150 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1159 sub preprocess_stderr
1163 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1165 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1166 Log ($job, "stderr $line");
1167 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1169 $main::please_freeze = 1;
1171 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1172 $jobstep[$job]->{node_fail} = 1;
1173 ban_node_by_slot($jobstep[$job]->{slotindex});
1182 my $task_success = shift;
1183 preprocess_stderr ($job);
1186 Log ($job, "stderr $_");
1187 } split ("\n", $jobstep[$job]->{stderr});
1193 my ($keep, $child_out, $output_block);
1195 my $cmd = "arv-get \Q$hash\E";
1196 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1200 my $bytes = sysread($keep, $buf, 1024 * 1024);
1201 if (!defined $bytes) {
1202 die "reading from arv-get: $!";
1203 } elsif ($bytes == 0) {
1204 # sysread returns 0 at the end of the pipe.
1207 # some bytes were read into buf.
1208 $output_block .= $buf;
1212 return $output_block;
1217 Log (undef, "collate");
1219 my ($child_out, $child_in);
1220 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1224 next if (!exists $_->{'arvados_task'}->{output} ||
1225 !$_->{'arvados_task'}->{'success'} ||
1226 $_->{'exitcode'} != 0);
1227 my $output = $_->{'arvados_task'}->{output};
1228 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1230 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1231 print $child_in $output;
1233 elsif (@jobstep == 1)
1235 $joboutput = $output;
1238 elsif (defined (my $outblock = fetch_block ($output)))
1240 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1241 print $child_in $outblock;
1245 Log (undef, "XXX fetch_block($output) failed XXX");
1251 if (!defined $joboutput) {
1252 my $s = IO::Select->new($child_out);
1253 if ($s->can_read(120)) {
1254 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1257 Log (undef, "timed out reading from 'arv-put'");
1270 my $sig = 2; # SIGINT first
1271 if (exists $proc{$_}->{"sent_$sig"} &&
1272 time - $proc{$_}->{"sent_$sig"} > 4)
1274 $sig = 15; # SIGTERM if SIGINT doesn't work
1276 if (exists $proc{$_}->{"sent_$sig"} &&
1277 time - $proc{$_}->{"sent_$sig"} > 4)
1279 $sig = 9; # SIGKILL if SIGTERM doesn't work
1281 if (!exists $proc{$_}->{"sent_$sig"})
1283 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1285 select (undef, undef, undef, 0.1);
1288 kill $sig, $_; # srun wants two SIGINT to really interrupt
1290 $proc{$_}->{"sent_$sig"} = time;
1291 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1301 vec($bits,fileno($_),1) = 1;
1307 sub Log # ($jobstep_id, $logmessage)
1309 if ($_[1] =~ /\n/) {
1310 for my $line (split (/\n/, $_[1])) {
1315 my $fh = select STDERR; $|=1; select $fh;
1316 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1317 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1320 if ($local_logfile || -t STDERR) {
1321 my @gmtime = gmtime;
1322 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1323 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1325 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1327 if ($local_logfile) {
1328 print $local_logfile $datetime . " " . $message;
1335 my ($package, $file, $line) = caller;
1336 my $message = "@_ at $file line $line\n";
1337 Log (undef, $message);
1338 freeze() if @jobstep_todo;
1339 collate_output() if @jobstep_todo;
1341 save_meta() if $local_logfile;
1348 return if !$job_has_uuid;
1349 $Job->update_attributes('running' => 0,
1351 'finished_at' => scalar gmtime);
1357 my $justcheckpoint = shift; # false if this will be the last meta saved
1358 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1360 $local_logfile->flush;
1361 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1362 . quotemeta($local_logfile->filename);
1363 my $loglocator = `$cmd`;
1364 die "system $cmd failed: $?" if $?;
1367 $local_logfile = undef; # the temp file is automatically deleted
1368 Log (undef, "log manifest is $loglocator");
1369 $Job->{'log'} = $loglocator;
1370 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1374 sub freeze_if_want_freeze
1376 if ($main::please_freeze)
1378 release_allocation();
1381 # kill some srun procs before freeze+stop
1382 map { $proc{$_} = {} } @_;
1385 killem (keys %proc);
1386 select (undef, undef, undef, 0.1);
1388 while (($died = waitpid (-1, WNOHANG)) > 0)
1390 delete $proc{$died};
1405 Log (undef, "Freeze not implemented");
1412 croak ("Thaw not implemented");
1428 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1435 my $srunargs = shift;
1436 my $execargs = shift;
1437 my $opts = shift || {};
1439 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1440 print STDERR (join (" ",
1441 map { / / ? "'$_'" : $_ }
1444 if $ENV{CRUNCH_DEBUG};
1446 if (defined $stdin) {
1447 my $child = open STDIN, "-|";
1448 defined $child or die "no fork: $!";
1450 print $stdin or die $!;
1451 close STDOUT or die $!;
1456 return system (@$args) if $opts->{fork};
1459 warn "ENV size is ".length(join(" ",%ENV));
1460 die "exec failed: $!: @$args";
1464 sub ban_node_by_slot {
1465 # Don't start any new jobsteps on this node for 60 seconds
1467 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1468 $slot[$slotid]->{node}->{hold_count}++;
1469 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1474 my ($lockfile, $error_message) = @_;
1475 open L, ">", $lockfile or croak("$lockfile: $!");
1476 if (!flock L, LOCK_EX|LOCK_NB) {
1477 croak("Can't lock $lockfile: $error_message\n");
1481 sub find_docker_image {
1482 # Given a Keep locator, check to see if it contains a Docker image.
1483 # If so, return its stream name and Docker hash.
1484 # If not, return undef for both values.
1485 my $locator = shift;
1486 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1487 my @file_list = @{$image->{files}};
1488 if ((scalar(@file_list) == 1) &&
1489 ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1490 return ($file_list[0][0], $1);
1493 return (undef, undef);
1499 # checkout-and-build
1503 my $destdir = $ENV{"CRUNCH_SRC"};
1504 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1505 my $repo = $ENV{"CRUNCH_SRC_URL"};
1507 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1509 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1513 unlink "$destdir.commit";
1514 open STDOUT, ">", "$destdir.log";
1515 open STDERR, ">&STDOUT";
1518 my @git_archive_data = <DATA>;
1519 if (@git_archive_data) {
1520 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1521 print TARX @git_archive_data;
1523 die "'tar -C $destdir -xf -' exited $?: $!";
1528 chomp ($pwd = `pwd`);
1529 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1532 for my $src_path ("$destdir/arvados/sdk/python") {
1534 shell_or_die ("virtualenv", $install_dir);
1535 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1539 if (-e "$destdir/crunch_scripts/install") {
1540 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1541 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1543 shell_or_die ("./tests/autotests.sh", $install_dir);
1544 } elsif (-e "./install.sh") {
1545 shell_or_die ("./install.sh", $install_dir);
1549 unlink "$destdir.commit.new";
1550 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1551 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1560 if ($ENV{"DEBUG"}) {
1561 print STDERR "@_\n";
1564 or die "@_ failed: $! exit 0x".sprintf("%x",$?);