2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
85 use File::Path qw( make_path );
87 use constant EX_TEMPFAIL => 75;
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92 if ($ENV{"USER"} ne "crunch" && $< != 0) {
93 # use a tmp dir unique for my uid
94 $ENV{"CRUNCH_TMP"} .= "-$<";
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
114 GetOptions('force-unlock' => \$force_unlock,
115 'git-dir=s' => \$git_dir,
116 'job=s' => \$jobspec,
117 'job-api-token=s' => \$job_api_token,
118 'no-clear-tmp' => \$no_clear_tmp,
119 'resume-stash=s' => \$resume_stash,
122 if (defined $job_api_token) {
123 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # If some other crunch-job process has grabbed this job (or we see
156 # other evidence that the job is already underway) we exit
157 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158 # mark the job as failed.
159 if ($Job->{'is_locked_by_uuid'}) {
160 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
163 if ($Job->{'success'} ne undef) {
164 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
167 if ($Job->{'running'}) {
168 Log(undef, "Job 'running' flag is already set");
171 if ($Job->{'started_at'}) {
172 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
179 $Job = JSON::decode_json($jobspec);
183 map { croak ("No $_ specified") unless $Job->{$_} }
184 qw(script script_version script_parameters);
187 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188 $Job->{'started_at'} = gmtime;
190 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
194 $job_id = $Job->{'uuid'};
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
204 Log (undef, "check slurm allocation");
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
211 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212 push @sinfo, "$localcpus localhost";
214 if (exists $ENV{SLURM_NODELIST})
216 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
220 my ($ncpus, $slurm_nodelist) = split;
221 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
224 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
227 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
230 foreach (split (",", $ranges))
243 push @nodelist, map {
245 $n =~ s/\[[-,\d]+\]/$_/;
252 push @nodelist, $nodelist;
255 foreach my $nodename (@nodelist)
257 Log (undef, "node $nodename - $ncpus slots");
258 my $node = { name => $nodename,
262 foreach my $cpu (1..$ncpus)
264 push @slot, { node => $node,
268 push @node, @nodelist;
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
283 # Claim this job, and make sure nobody else does
284 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
289 $Job->update_attributes('started_at' => scalar gmtime,
292 'tasks_summary' => { 'failed' => 0,
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
331 if (defined $Job->{thawedfromkey})
333 thaw ($Job->{thawedfromkey});
337 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338 'job_uuid' => $Job->{'uuid'},
343 push @jobstep, { 'level' => 0,
345 'arvados_task' => $first_task,
347 push @jobstep_todo, 0;
353 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
365 if (!defined $no_clear_tmp) {
366 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367 system($clear_tmp_cmd) == 0
368 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
370 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
373 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
377 or croak ("setup.py in $src_path failed: exit ".($?>>8));
385 $build_script = <DATA>;
387 Log (undef, "Install revision ".$Job->{script_version});
388 my $nodelist = join(",", @node);
390 if (!defined $no_clear_tmp) {
391 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
393 my $cleanpid = fork();
396 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
402 last if $cleanpid == waitpid (-1, WNOHANG);
403 freeze_if_want_freeze ($cleanpid);
404 select (undef, undef, undef, 0.1);
406 Log (undef, "Clean-work-dir exited $?");
409 # Install requested code version
412 my @srunargs = ("srun",
413 "--nodelist=$nodelist",
414 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
416 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
421 my $treeish = $Job->{'script_version'};
423 # If we're running under crunch-dispatch, it will have pulled the
424 # appropriate source tree into its own repository, and given us that
425 # repo's path as $git_dir. If we're running a "local" job, and a
426 # script_version was specified, it's up to the user to provide the
427 # full path to a local repository in Job->{repository}.
429 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430 # git-archive --remote where appropriate.
432 # TODO: Accept a locally-hosted Arvados repository by name or
433 # UUID. Use arvados.v1.repositories.list or .get to figure out the
434 # appropriate fetch-url.
435 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
437 $ENV{"CRUNCH_SRC_URL"} = $repo;
439 if (-d "$repo/.git") {
440 # We were given a working directory, but we are only interested in
442 $repo = "$repo/.git";
445 # If this looks like a subversion r#, look for it in git-svn commit messages
447 if ($treeish =~ m{^\d{1,4}$}) {
448 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
450 if ($gitlog =~ /^[a-f0-9]{40}$/) {
452 Log (undef, "Using commit $commit for script_version $treeish");
456 # If that didn't work, try asking git to look it up as a tree-ish.
458 if (!defined $commit) {
459 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
461 if ($found =~ /^[0-9a-f]{40}$/s) {
463 if ($commit ne $treeish) {
464 # Make sure we record the real commit id in the database,
465 # frozentokey, logs, etc. -- instead of an abbreviation or a
466 # branch name which can become ambiguous or point to a
467 # different commit in the future.
468 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469 Log (undef, "Using commit $commit for tree-ish $treeish");
470 if ($commit ne $treeish) {
471 $Job->{'script_version'} = $commit;
473 $Job->update_attributes('script_version' => $commit) or
474 croak("Error while updating job");
480 if (defined $commit) {
481 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482 @execargs = ("sh", "-c",
483 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
487 croak ("could not figure out commit id for $treeish");
490 # Note: this section is almost certainly unnecessary if we're
491 # running tasks in docker containers.
492 my $installpid = fork();
493 if ($installpid == 0)
495 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
500 last if $installpid == waitpid (-1, WNOHANG);
501 freeze_if_want_freeze ($installpid);
502 select (undef, undef, undef, 0.1);
504 Log (undef, "Install exited $?");
509 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
510 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
513 # If this job requires a Docker image, install that.
514 my $docker_bin = "/usr/bin/docker.io";
515 my ($docker_locator, $docker_stream, $docker_hash);
516 if ($docker_locator = $Job->{docker_image_locator}) {
517 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
520 croak("No Docker image hash found from locator $docker_locator");
522 $docker_stream =~ s/^\.//;
523 my $docker_install_script = qq{
524 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
525 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
528 my $docker_pid = fork();
529 if ($docker_pid == 0)
531 srun (["srun", "--nodelist=" . join(',', @node)],
532 ["/bin/sh", "-ec", $docker_install_script]);
537 last if $docker_pid == waitpid (-1, WNOHANG);
538 freeze_if_want_freeze ($docker_pid);
539 select (undef, undef, undef, 0.1);
543 croak("Installing Docker image from $docker_locator returned exit code $?");
547 foreach (qw (script script_version script_parameters runtime_constraints))
551 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
553 foreach (split (/\n/, $Job->{knobs}))
555 Log (undef, "knob " . $_);
560 $main::success = undef;
566 my $thisround_succeeded = 0;
567 my $thisround_failed = 0;
568 my $thisround_failed_multiple = 0;
570 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
571 or $a <=> $b } @jobstep_todo;
572 my $level = $jobstep[$jobstep_todo[0]]->{level};
573 Log (undef, "start level $level");
578 my @freeslot = (0..$#slot);
581 my $progress_is_dirty = 1;
582 my $progress_stats_updated = 0;
584 update_progress_stats();
589 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
591 my $id = $jobstep_todo[$todo_ptr];
592 my $Jobstep = $jobstep[$id];
593 if ($Jobstep->{level} != $level)
598 pipe $reader{$id}, "writer" or croak ($!);
599 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
600 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
602 my $childslot = $freeslot[0];
603 my $childnode = $slot[$childslot]->{node};
604 my $childslotname = join (".",
605 $slot[$childslot]->{node}->{name},
606 $slot[$childslot]->{cpu});
607 my $childpid = fork();
610 $SIG{'INT'} = 'DEFAULT';
611 $SIG{'QUIT'} = 'DEFAULT';
612 $SIG{'TERM'} = 'DEFAULT';
614 foreach (values (%reader))
618 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
619 open(STDOUT,">&writer");
620 open(STDERR,">&writer");
625 delete $ENV{"GNUPGHOME"};
626 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
627 $ENV{"TASK_QSEQUENCE"} = $id;
628 $ENV{"TASK_SEQUENCE"} = $level;
629 $ENV{"JOB_SCRIPT"} = $Job->{script};
630 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
631 $param =~ tr/a-z/A-Z/;
632 $ENV{"JOB_PARAMETER_$param"} = $value;
634 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
635 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
636 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
637 $ENV{"HOME"} = $ENV{"TASK_WORK"};
638 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
639 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
640 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
641 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
647 "--nodelist=".$childnode->{name},
648 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
649 "--job-name=$job_id.$id.$$",
651 my $build_script_to_send = "";
653 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
654 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
655 ."&& cd $ENV{CRUNCH_TMP} ";
658 $build_script_to_send = $build_script;
662 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
665 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
666 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
667 # Dynamically configure the container to use the host system as its
668 # DNS server. Get the host's global addresses from the ip command,
669 # and turn them into docker --dns options using gawk.
671 q{$(ip -o address show scope global |
672 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
673 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
674 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
675 $command .= "--env=\QHOME=/home/crunch\E ";
676 while (my ($env_key, $env_val) = each %ENV)
678 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
679 if ($env_key eq "TASK_WORK") {
680 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
682 elsif ($env_key eq "TASK_KEEPMOUNT") {
683 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
686 $command .= "--env=\Q$env_key=$env_val\E ";
690 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
691 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
692 $command .= "\Q$docker_hash\E ";
693 $command .= "stdbuf --output=0 --error=0 ";
694 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
697 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
698 $command .= "stdbuf --output=0 --error=0 ";
699 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
702 my @execargs = ('bash', '-c', $command);
703 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
704 # exec() failed, we assume nothing happened.
705 Log(undef, "srun() failed on build script");
709 if (!defined $childpid)
716 $proc{$childpid} = { jobstep => $id,
719 jobstepname => "$job_id.$id.$childpid",
721 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
722 $slot[$childslot]->{pid} = $childpid;
724 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
725 Log ($id, "child $childpid started on $childslotname");
726 $Jobstep->{starttime} = time;
727 $Jobstep->{node} = $childnode->{name};
728 $Jobstep->{slotindex} = $childslot;
729 delete $Jobstep->{stderr};
730 delete $Jobstep->{finishtime};
732 splice @jobstep_todo, $todo_ptr, 1;
735 $progress_is_dirty = 1;
739 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
741 last THISROUND if $main::please_freeze;
742 if ($main::please_info)
744 $main::please_info = 0;
748 update_progress_stats();
755 check_refresh_wanted();
757 update_progress_stats();
758 select (undef, undef, undef, 0.1);
760 elsif (time - $progress_stats_updated >= 30)
762 update_progress_stats();
764 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
765 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
767 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
768 .($thisround_failed+$thisround_succeeded)
769 .") -- giving up on this round";
770 Log (undef, $message);
774 # move slots from freeslot to holdslot (or back to freeslot) if necessary
775 for (my $i=$#freeslot; $i>=0; $i--) {
776 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
777 push @holdslot, (splice @freeslot, $i, 1);
780 for (my $i=$#holdslot; $i>=0; $i--) {
781 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
782 push @freeslot, (splice @holdslot, $i, 1);
786 # give up if no nodes are succeeding
787 if (!grep { $_->{node}->{losing_streak} == 0 &&
788 $_->{node}->{hold_count} < 4 } @slot) {
789 my $message = "Every node has failed -- giving up on this round";
790 Log (undef, $message);
797 push @freeslot, splice @holdslot;
798 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
801 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
804 if ($main::please_continue) {
805 $main::please_continue = 0;
808 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
812 check_refresh_wanted();
814 update_progress_stats();
815 select (undef, undef, undef, 0.1);
816 killem (keys %proc) if $main::please_freeze;
820 update_progress_stats();
821 freeze_if_want_freeze();
824 if (!defined $main::success)
827 $thisround_succeeded == 0 &&
828 ($thisround_failed == 0 || $thisround_failed > 4))
830 my $message = "stop because $thisround_failed tasks failed and none succeeded";
831 Log (undef, $message);
840 goto ONELEVEL if !defined $main::success;
843 release_allocation();
845 my $collated_output = &collate_output();
848 $Job->update_attributes('running' => 0,
849 'success' => $collated_output && $main::success,
850 'finished_at' => scalar gmtime)
853 if (!$collated_output) {
854 Log(undef, "output undef");
858 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
859 or die "failed to get collated manifest: $!";
860 my $orig_manifest_text = '';
861 while (my $manifest_line = <$orig_manifest>) {
862 $orig_manifest_text .= $manifest_line;
864 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
865 'manifest_text' => $orig_manifest_text,
867 Log(undef, "output uuid " . $output->{uuid});
868 Log(undef, "output hash " . $output->{portable_data_hash});
869 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
872 Log (undef, "Failed to register output manifest: $@");
876 Log (undef, "finish");
879 exit ($Job->{'success'} ? 1 : 0);
883 sub update_progress_stats
885 $progress_stats_updated = time;
886 return if !$progress_is_dirty;
887 my ($todo, $done, $running) = (scalar @jobstep_todo,
888 scalar @jobstep_done,
889 scalar @slot - scalar @freeslot - scalar @holdslot);
890 $Job->{'tasks_summary'} ||= {};
891 $Job->{'tasks_summary'}->{'todo'} = $todo;
892 $Job->{'tasks_summary'}->{'done'} = $done;
893 $Job->{'tasks_summary'}->{'running'} = $running;
895 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
897 Log (undef, "status: $done done, $running running, $todo todo");
898 $progress_is_dirty = 0;
905 my $pid = waitpid (-1, WNOHANG);
906 return 0 if $pid <= 0;
908 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
910 . $slot[$proc{$pid}->{slot}]->{cpu});
911 my $jobstepid = $proc{$pid}->{jobstep};
912 my $elapsed = time - $proc{$pid}->{time};
913 my $Jobstep = $jobstep[$jobstepid];
915 my $childstatus = $?;
916 my $exitvalue = $childstatus >> 8;
917 my $exitinfo = sprintf("exit %d signal %d%s",
920 ($childstatus & 128 ? ' core dump' : ''));
921 $Jobstep->{'arvados_task'}->reload;
922 my $task_success = $Jobstep->{'arvados_task'}->{success};
924 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
926 if (!defined $task_success) {
927 # task did not indicate one way or the other --> fail
928 $Jobstep->{'arvados_task'}->{success} = 0;
929 $Jobstep->{'arvados_task'}->save;
936 $temporary_fail ||= $Jobstep->{node_fail};
937 $temporary_fail ||= ($exitvalue == 111);
940 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
942 # Check for signs of a failed or misconfigured node
943 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
944 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
945 # Don't count this against jobstep failure thresholds if this
946 # node is already suspected faulty and srun exited quickly
947 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
949 Log ($jobstepid, "blaming failure on suspect node " .
950 $slot[$proc{$pid}->{slot}]->{node}->{name});
951 $temporary_fail ||= 1;
953 ban_node_by_slot($proc{$pid}->{slot});
956 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
957 ++$Jobstep->{'failures'},
958 $temporary_fail ? 'temporary ' : 'permanent',
961 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
962 # Give up on this task, and the whole job
964 $main::please_freeze = 1;
967 # Put this task back on the todo queue
968 push @jobstep_todo, $jobstepid;
970 $Job->{'tasks_summary'}->{'failed'}++;
974 ++$thisround_succeeded;
975 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
976 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
977 push @jobstep_done, $jobstepid;
978 Log ($jobstepid, "success in $elapsed seconds");
980 $Jobstep->{exitcode} = $childstatus;
981 $Jobstep->{finishtime} = time;
982 process_stderr ($jobstepid, $task_success);
983 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
985 close $reader{$jobstepid};
986 delete $reader{$jobstepid};
987 delete $slot[$proc{$pid}->{slot}]->{pid};
988 push @freeslot, $proc{$pid}->{slot};
993 my $newtask_list = [];
996 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
998 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1000 'order' => 'qsequence',
1001 'offset' => scalar(@$newtask_list),
1003 push(@$newtask_list, @{$newtask_results->{items}});
1004 } while (@{$newtask_results->{items}});
1005 foreach my $arvados_task (@$newtask_list) {
1007 'level' => $arvados_task->{'sequence'},
1009 'arvados_task' => $arvados_task
1011 push @jobstep, $jobstep;
1012 push @jobstep_todo, $#jobstep;
1016 $progress_is_dirty = 1;
1020 sub check_refresh_wanted
1022 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1023 if (@stat && $stat[9] > $latest_refresh) {
1024 $latest_refresh = scalar time;
1025 if ($job_has_uuid) {
1026 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1027 for my $attr ('cancelled_at',
1028 'cancelled_by_user_uuid',
1029 'cancelled_by_client_uuid') {
1030 $Job->{$attr} = $Job2->{$attr};
1032 if ($Job->{'cancelled_at'}) {
1033 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1034 " by user " . $Job->{cancelled_by_user_uuid});
1036 $main::please_freeze = 1;
1044 # return if the kill list was checked <4 seconds ago
1045 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1049 $squeue_kill_checked = time;
1051 # use killem() on procs whose killtime is reached
1054 if (exists $proc{$_}->{killtime}
1055 && $proc{$_}->{killtime} <= time)
1061 # return if the squeue was checked <60 seconds ago
1062 if (defined $squeue_checked && $squeue_checked > time - 60)
1066 $squeue_checked = time;
1070 # here is an opportunity to check for mysterious problems with local procs
1074 # get a list of steps still running
1075 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1077 if ($squeue[-1] ne "ok")
1083 # which of my jobsteps are running, according to squeue?
1087 if (/^(\d+)\.(\d+) (\S+)/)
1089 if ($1 eq $ENV{SLURM_JOBID})
1096 # which of my active child procs (>60s old) were not mentioned by squeue?
1097 foreach (keys %proc)
1099 if ($proc{$_}->{time} < time - 60
1100 && !exists $ok{$proc{$_}->{jobstepname}}
1101 && !exists $proc{$_}->{killtime})
1103 # kill this proc if it hasn't exited in 30 seconds
1104 $proc{$_}->{killtime} = time + 30;
1110 sub release_allocation
1114 Log (undef, "release job allocation");
1115 system "scancel $ENV{SLURM_JOBID}";
1123 foreach my $job (keys %reader)
1126 while (0 < sysread ($reader{$job}, $buf, 8192))
1128 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1129 $jobstep[$job]->{stderr} .= $buf;
1130 preprocess_stderr ($job);
1131 if (length ($jobstep[$job]->{stderr}) > 16384)
1133 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1142 sub preprocess_stderr
1146 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1148 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1149 Log ($job, "stderr $line");
1150 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1152 $main::please_freeze = 1;
1154 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1155 $jobstep[$job]->{node_fail} = 1;
1156 ban_node_by_slot($jobstep[$job]->{slotindex});
1165 my $task_success = shift;
1166 preprocess_stderr ($job);
1169 Log ($job, "stderr $_");
1170 } split ("\n", $jobstep[$job]->{stderr});
1176 my ($keep, $child_out, $output_block);
1178 my $cmd = "arv-get \Q$hash\E";
1179 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1183 my $bytes = sysread($keep, $buf, 1024 * 1024);
1184 if (!defined $bytes) {
1185 die "reading from arv-get: $!";
1186 } elsif ($bytes == 0) {
1187 # sysread returns 0 at the end of the pipe.
1190 # some bytes were read into buf.
1191 $output_block .= $buf;
1195 return $output_block;
1200 Log (undef, "collate");
1202 my ($child_out, $child_in);
1203 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1207 next if (!exists $_->{'arvados_task'}->{'output'} ||
1208 !$_->{'arvados_task'}->{'success'});
1209 my $output = $_->{'arvados_task'}->{output};
1210 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1212 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1213 print $child_in $output;
1215 elsif (@jobstep == 1)
1217 $joboutput = $output;
1220 elsif (defined (my $outblock = fetch_block ($output)))
1222 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1223 print $child_in $outblock;
1227 Log (undef, "XXX fetch_block($output) failed XXX");
1233 if (!defined $joboutput) {
1234 my $s = IO::Select->new($child_out);
1235 if ($s->can_read(120)) {
1236 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1239 Log (undef, "timed out reading from 'arv-put'");
1252 my $sig = 2; # SIGINT first
1253 if (exists $proc{$_}->{"sent_$sig"} &&
1254 time - $proc{$_}->{"sent_$sig"} > 4)
1256 $sig = 15; # SIGTERM if SIGINT doesn't work
1258 if (exists $proc{$_}->{"sent_$sig"} &&
1259 time - $proc{$_}->{"sent_$sig"} > 4)
1261 $sig = 9; # SIGKILL if SIGTERM doesn't work
1263 if (!exists $proc{$_}->{"sent_$sig"})
1265 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1267 select (undef, undef, undef, 0.1);
1270 kill $sig, $_; # srun wants two SIGINT to really interrupt
1272 $proc{$_}->{"sent_$sig"} = time;
1273 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1283 vec($bits,fileno($_),1) = 1;
1289 sub Log # ($jobstep_id, $logmessage)
1291 if ($_[1] =~ /\n/) {
1292 for my $line (split (/\n/, $_[1])) {
1297 my $fh = select STDERR; $|=1; select $fh;
1298 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1299 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1302 if ($local_logfile || -t STDERR) {
1303 my @gmtime = gmtime;
1304 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1305 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1307 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1309 if ($local_logfile) {
1310 print $local_logfile $datetime . " " . $message;
1317 my ($package, $file, $line) = caller;
1318 my $message = "@_ at $file line $line\n";
1319 Log (undef, $message);
1320 freeze() if @jobstep_todo;
1321 collate_output() if @jobstep_todo;
1323 save_meta() if $local_logfile;
1330 return if !$job_has_uuid;
1331 $Job->update_attributes('running' => 0,
1333 'finished_at' => scalar gmtime);
1339 my $justcheckpoint = shift; # false if this will be the last meta saved
1340 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1342 $local_logfile->flush;
1343 my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
1344 . quotemeta($local_logfile->filename);
1345 my $loglocator = `$cmd`;
1346 die "system $cmd failed: $?" if $?;
1349 $local_logfile = undef; # the temp file is automatically deleted
1350 Log (undef, "log manifest is $loglocator");
1351 $Job->{'log'} = $loglocator;
1352 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1356 sub freeze_if_want_freeze
1358 if ($main::please_freeze)
1360 release_allocation();
1363 # kill some srun procs before freeze+stop
1364 map { $proc{$_} = {} } @_;
1367 killem (keys %proc);
1368 select (undef, undef, undef, 0.1);
1370 while (($died = waitpid (-1, WNOHANG)) > 0)
1372 delete $proc{$died};
1387 Log (undef, "Freeze not implemented");
1394 croak ("Thaw not implemented");
1410 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1417 my $srunargs = shift;
1418 my $execargs = shift;
1419 my $opts = shift || {};
1421 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1422 print STDERR (join (" ",
1423 map { / / ? "'$_'" : $_ }
1426 if $ENV{CRUNCH_DEBUG};
1428 if (defined $stdin) {
1429 my $child = open STDIN, "-|";
1430 defined $child or die "no fork: $!";
1432 print $stdin or die $!;
1433 close STDOUT or die $!;
1438 return system (@$args) if $opts->{fork};
1441 warn "ENV size is ".length(join(" ",%ENV));
1442 die "exec failed: $!: @$args";
1446 sub ban_node_by_slot {
1447 # Don't start any new jobsteps on this node for 60 seconds
1449 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1450 $slot[$slotid]->{node}->{hold_count}++;
1451 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1456 my ($lockfile, $error_message) = @_;
1457 open L, ">", $lockfile or croak("$lockfile: $!");
1458 if (!flock L, LOCK_EX|LOCK_NB) {
1459 croak("Can't lock $lockfile: $error_message\n");
1463 sub find_docker_image {
1464 # Given a Keep locator, check to see if it contains a Docker image.
1465 # If so, return its stream name and Docker hash.
1466 # If not, return undef for both values.
1467 my $locator = shift;
1468 my ($streamname, $filename);
1469 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1470 foreach my $line (split(/\n/, $image->{manifest_text})) {
1471 my @tokens = split(/\s+/, $line);
1473 $streamname = shift(@tokens);
1474 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1475 if (defined($filename)) {
1476 return (undef, undef); # More than one file in the Collection.
1478 $filename = (split(/:/, $filedata, 3))[2];
1483 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1484 return ($streamname, $1);
1486 return (undef, undef);
1493 # checkout-and-build
1496 use File::Path qw( make_path );
1498 my $destdir = $ENV{"CRUNCH_SRC"};
1499 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1500 my $repo = $ENV{"CRUNCH_SRC_URL"};
1501 my $task_work = $ENV{"TASK_WORK"};
1503 for my $dir ($destdir, $task_work) {
1506 -e $dir or die "Failed to create temporary directory ($dir): $!";
1510 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1512 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1515 die "Cannot exec `@ARGV`: $!";
1521 unlink "$destdir.commit";
1522 open STDOUT, ">", "$destdir.log";
1523 open STDERR, ">&STDOUT";
1526 my @git_archive_data = <DATA>;
1527 if (@git_archive_data) {
1528 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1529 print TARX @git_archive_data;
1531 die "'tar -C $destdir -xf -' exited $?: $!";
1536 chomp ($pwd = `pwd`);
1537 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1540 for my $src_path ("$destdir/arvados/sdk/python") {
1542 shell_or_die ("virtualenv", $install_dir);
1543 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1547 if (-e "$destdir/crunch_scripts/install") {
1548 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1549 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1551 shell_or_die ("./tests/autotests.sh", $install_dir);
1552 } elsif (-e "./install.sh") {
1553 shell_or_die ("./install.sh", $install_dir);
1557 unlink "$destdir.commit.new";
1558 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1559 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1566 die "Cannot exec `@ARGV`: $!";
1573 if ($ENV{"DEBUG"}) {
1574 print STDERR "@_\n";
1577 or die "@_ failed: $! exit 0x".sprintf("%x",$?);