2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
85 use File::Path qw( make_path );
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90 if ($ENV{"USER"} ne "crunch" && $< != 0) {
91 # use a tmp dir unique for my uid
92 $ENV{"CRUNCH_TMP"} .= "-$<";
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
112 GetOptions('force-unlock' => \$force_unlock,
113 'git-dir=s' => \$git_dir,
114 'job=s' => \$jobspec,
115 'job-api-token=s' => \$job_api_token,
116 'no-clear-tmp' => \$no_clear_tmp,
117 'resume-stash=s' => \$resume_stash,
120 if (defined $job_api_token) {
121 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
131 $main::ENV{CRUNCH_DEBUG} = 1;
135 $main::ENV{CRUNCH_DEBUG} = 0;
140 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $User = $arv->{'users'}->{'current'}->execute;
151 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152 if (!$force_unlock) {
153 if ($Job->{'is_locked_by_uuid'}) {
154 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
156 if ($Job->{'success'} ne undef) {
157 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
159 if ($Job->{'running'}) {
160 croak("Job 'running' flag is already set");
162 if ($Job->{'started_at'}) {
163 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
169 $Job = JSON::decode_json($jobspec);
173 map { croak ("No $_ specified") unless $Job->{$_} }
174 qw(script script_version script_parameters);
177 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178 $Job->{'started_at'} = gmtime;
180 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
184 $job_id = $Job->{'uuid'};
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194 Log (undef, "check slurm allocation");
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
201 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202 push @sinfo, "$localcpus localhost";
204 if (exists $ENV{SLURM_NODELIST})
206 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
210 my ($ncpus, $slurm_nodelist) = split;
211 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220 foreach (split (",", $ranges))
233 push @nodelist, map {
235 $n =~ s/\[[-,\d]+\]/$_/;
242 push @nodelist, $nodelist;
245 foreach my $nodename (@nodelist)
247 Log (undef, "node $nodename - $ncpus slots");
248 my $node = { name => $nodename,
252 foreach my $cpu (1..$ncpus)
254 push @slot, { node => $node,
258 push @node, @nodelist;
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
273 # Claim this job, and make sure nobody else does
274 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
275 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
276 croak("Error while updating / locking job");
278 $Job->update_attributes('started_at' => scalar gmtime,
281 'tasks_summary' => { 'failed' => 0,
288 Log (undef, "start");
289 $SIG{'INT'} = sub { $main::please_freeze = 1; };
290 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
291 $SIG{'TERM'} = \&croak;
292 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
293 $SIG{'ALRM'} = sub { $main::please_info = 1; };
294 $SIG{'CONT'} = sub { $main::please_continue = 1; };
295 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
297 $main::please_freeze = 0;
298 $main::please_info = 0;
299 $main::please_continue = 0;
300 $main::please_refresh = 0;
301 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
303 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
304 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
305 $ENV{"JOB_UUID"} = $job_id;
309 my @jobstep_todo = ();
310 my @jobstep_done = ();
311 my @jobstep_tomerge = ();
312 my $jobstep_tomerge_level = 0;
314 my $squeue_kill_checked;
315 my $output_in_keep = 0;
316 my $latest_refresh = scalar time;
320 if (defined $Job->{thawedfromkey})
322 thaw ($Job->{thawedfromkey});
326 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
327 'job_uuid' => $Job->{'uuid'},
332 push @jobstep, { 'level' => 0,
334 'arvados_task' => $first_task,
336 push @jobstep_todo, 0;
342 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
349 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
351 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if (!defined $no_clear_tmp) {
355 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
356 system($clear_tmp_cmd) == 0
357 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
359 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
360 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
362 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
363 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
364 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
366 or croak ("setup.py in $src_path failed: exit ".($?>>8));
374 $build_script = <DATA>;
376 Log (undef, "Install revision ".$Job->{script_version});
377 my $nodelist = join(",", @node);
379 if (!defined $no_clear_tmp) {
380 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382 my $cleanpid = fork();
385 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
386 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
391 last if $cleanpid == waitpid (-1, WNOHANG);
392 freeze_if_want_freeze ($cleanpid);
393 select (undef, undef, undef, 0.1);
395 Log (undef, "Clean-work-dir exited $?");
398 # Install requested code version
401 my @srunargs = ("srun",
402 "--nodelist=$nodelist",
403 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
405 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
406 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
410 my $treeish = $Job->{'script_version'};
412 # If we're running under crunch-dispatch, it will have pulled the
413 # appropriate source tree into its own repository, and given us that
414 # repo's path as $git_dir. If we're running a "local" job, and a
415 # script_version was specified, it's up to the user to provide the
416 # full path to a local repository in Job->{repository}.
418 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
419 # git-archive --remote where appropriate.
421 # TODO: Accept a locally-hosted Arvados repository by name or
422 # UUID. Use arvados.v1.repositories.list or .get to figure out the
423 # appropriate fetch-url.
424 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
426 $ENV{"CRUNCH_SRC_URL"} = $repo;
428 if (-d "$repo/.git") {
429 # We were given a working directory, but we are only interested in
431 $repo = "$repo/.git";
434 # If this looks like a subversion r#, look for it in git-svn commit messages
436 if ($treeish =~ m{^\d{1,4}$}) {
437 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
439 if ($gitlog =~ /^[a-f0-9]{40}$/) {
441 Log (undef, "Using commit $commit for script_version $treeish");
445 # If that didn't work, try asking git to look it up as a tree-ish.
447 if (!defined $commit) {
448 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
450 if ($found =~ /^[0-9a-f]{40}$/s) {
452 if ($commit ne $treeish) {
453 # Make sure we record the real commit id in the database,
454 # frozentokey, logs, etc. -- instead of an abbreviation or a
455 # branch name which can become ambiguous or point to a
456 # different commit in the future.
457 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458 Log (undef, "Using commit $commit for tree-ish $treeish");
459 if ($commit ne $treeish) {
460 $Job->{'script_version'} = $commit;
462 $Job->update_attributes('script_version' => $commit) or
463 croak("Error while updating job");
469 if (defined $commit) {
470 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
471 @execargs = ("sh", "-c",
472 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
473 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476 croak ("could not figure out commit id for $treeish");
479 my $installpid = fork();
480 if ($installpid == 0)
482 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
487 last if $installpid == waitpid (-1, WNOHANG);
488 freeze_if_want_freeze ($installpid);
489 select (undef, undef, undef, 0.1);
491 Log (undef, "Install exited $?");
496 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
497 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 # If this job requires a Docker image, install that.
501 my $docker_bin = "/usr/bin/docker.io";
502 my ($docker_locator, $docker_hash);
503 if ($docker_locator = $Job->{docker_image_locator}) {
504 $docker_hash = find_docker_hash($docker_locator);
507 croak("No Docker image hash found from locator $docker_locator");
509 my $docker_install_script = qq{
510 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
511 arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 my $docker_pid = fork();
515 if ($docker_pid == 0)
517 srun (["srun", "--nodelist=" . join(',', @node)],
518 ["/bin/sh", "-ec", $docker_install_script]);
523 last if $docker_pid == waitpid (-1, WNOHANG);
524 freeze_if_want_freeze ($docker_pid);
525 select (undef, undef, undef, 0.1);
529 croak("Installing Docker image from $docker_locator returned exit code $?");
533 foreach (qw (script script_version script_parameters runtime_constraints))
537 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
539 foreach (split (/\n/, $Job->{knobs}))
541 Log (undef, "knob " . $_);
546 $main::success = undef;
552 my $thisround_succeeded = 0;
553 my $thisround_failed = 0;
554 my $thisround_failed_multiple = 0;
556 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
557 or $a <=> $b } @jobstep_todo;
558 my $level = $jobstep[$jobstep_todo[0]]->{level};
559 Log (undef, "start level $level");
564 my @freeslot = (0..$#slot);
567 my $progress_is_dirty = 1;
568 my $progress_stats_updated = 0;
570 update_progress_stats();
575 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
577 my $id = $jobstep_todo[$todo_ptr];
578 my $Jobstep = $jobstep[$id];
579 if ($Jobstep->{level} != $level)
584 pipe $reader{$id}, "writer" or croak ($!);
585 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
586 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
588 my $childslot = $freeslot[0];
589 my $childnode = $slot[$childslot]->{node};
590 my $childslotname = join (".",
591 $slot[$childslot]->{node}->{name},
592 $slot[$childslot]->{cpu});
593 my $childpid = fork();
596 $SIG{'INT'} = 'DEFAULT';
597 $SIG{'QUIT'} = 'DEFAULT';
598 $SIG{'TERM'} = 'DEFAULT';
600 foreach (values (%reader))
604 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
605 open(STDOUT,">&writer");
606 open(STDERR,">&writer");
611 delete $ENV{"GNUPGHOME"};
612 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
613 $ENV{"TASK_QSEQUENCE"} = $id;
614 $ENV{"TASK_SEQUENCE"} = $level;
615 $ENV{"JOB_SCRIPT"} = $Job->{script};
616 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
617 $param =~ tr/a-z/A-Z/;
618 $ENV{"JOB_PARAMETER_$param"} = $value;
620 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
621 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
622 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
623 $ENV{"HOME"} = $ENV{"TASK_WORK"};
624 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
625 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
626 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
627 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
633 "--nodelist=".$childnode->{name},
634 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
635 "--job-name=$job_id.$id.$$",
637 my $build_script_to_send = "";
639 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
640 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
641 ."&& cd $ENV{CRUNCH_TMP} ";
644 $build_script_to_send = $build_script;
648 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
651 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
652 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
653 # Dynamically configure the container to use the host system as its
654 # DNS server. Get the host's global addresses from the ip command,
655 # and turn them into docker --dns options using gawk.
657 q{$(ip -o address show scope global |
658 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
659 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
660 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
661 $command .= "--env=\QHOME=/home/crunch\E ";
662 while (my ($env_key, $env_val) = each %ENV)
664 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
665 if ($env_key eq "TASK_WORK") {
666 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
668 elsif ($env_key eq "TASK_KEEPMOUNT") {
669 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
672 $command .= "--env=\Q$env_key=$env_val\E ";
676 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
677 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
678 $command .= "\Q$docker_hash\E ";
679 $command .= "stdbuf --output=0 --error=0 ";
680 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
683 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
684 $command .= "stdbuf --output=0 --error=0 ";
685 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
688 my @execargs = ('bash', '-c', $command);
689 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
693 if (!defined $childpid)
700 $proc{$childpid} = { jobstep => $id,
703 jobstepname => "$job_id.$id.$childpid",
705 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
706 $slot[$childslot]->{pid} = $childpid;
708 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
709 Log ($id, "child $childpid started on $childslotname");
710 $Jobstep->{starttime} = time;
711 $Jobstep->{node} = $childnode->{name};
712 $Jobstep->{slotindex} = $childslot;
713 delete $Jobstep->{stderr};
714 delete $Jobstep->{finishtime};
716 splice @jobstep_todo, $todo_ptr, 1;
719 $progress_is_dirty = 1;
723 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
725 last THISROUND if $main::please_freeze;
726 if ($main::please_info)
728 $main::please_info = 0;
732 update_progress_stats();
739 check_refresh_wanted();
741 update_progress_stats();
742 select (undef, undef, undef, 0.1);
744 elsif (time - $progress_stats_updated >= 30)
746 update_progress_stats();
748 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
749 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
751 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
752 .($thisround_failed+$thisround_succeeded)
753 .") -- giving up on this round";
754 Log (undef, $message);
758 # move slots from freeslot to holdslot (or back to freeslot) if necessary
759 for (my $i=$#freeslot; $i>=0; $i--) {
760 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
761 push @holdslot, (splice @freeslot, $i, 1);
764 for (my $i=$#holdslot; $i>=0; $i--) {
765 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
766 push @freeslot, (splice @holdslot, $i, 1);
770 # give up if no nodes are succeeding
771 if (!grep { $_->{node}->{losing_streak} == 0 &&
772 $_->{node}->{hold_count} < 4 } @slot) {
773 my $message = "Every node has failed -- giving up on this round";
774 Log (undef, $message);
781 push @freeslot, splice @holdslot;
782 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
785 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
788 if ($main::please_continue) {
789 $main::please_continue = 0;
792 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
796 check_refresh_wanted();
798 update_progress_stats();
799 select (undef, undef, undef, 0.1);
800 killem (keys %proc) if $main::please_freeze;
804 update_progress_stats();
805 freeze_if_want_freeze();
808 if (!defined $main::success)
811 $thisround_succeeded == 0 &&
812 ($thisround_failed == 0 || $thisround_failed > 4))
814 my $message = "stop because $thisround_failed tasks failed and none succeeded";
815 Log (undef, $message);
824 goto ONELEVEL if !defined $main::success;
827 release_allocation();
829 my $collated_output = &collate_output();
832 $Job->update_attributes('running' => 0,
833 'success' => $collated_output && $main::success,
834 'finished_at' => scalar gmtime)
837 if ($collated_output)
840 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
841 or die "failed to get collated manifest: $!";
842 # Read the original manifest, and strip permission hints from it,
843 # so we can put the result in a Collection.
844 my @stripped_manifest_lines = ();
845 my $orig_manifest_text = '';
846 while (my $manifest_line = <$orig_manifest>) {
847 $orig_manifest_text .= $manifest_line;
848 my @words = split(/ /, $manifest_line, -1);
849 foreach my $ii (0..$#words) {
850 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
851 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
854 push(@stripped_manifest_lines, join(" ", @words));
856 my $stripped_manifest_text = join("", @stripped_manifest_lines);
857 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
858 'uuid' => md5_hex($stripped_manifest_text),
859 'manifest_text' => $orig_manifest_text,
861 $Job->update_attributes('output' => $output->{uuid});
862 if ($Job->{'output_is_persistent'}) {
863 $arv->{'links'}->{'create'}->execute('link' => {
864 'tail_kind' => 'arvados#user',
865 'tail_uuid' => $User->{'uuid'},
866 'head_kind' => 'arvados#collection',
867 'head_uuid' => $Job->{'output'},
868 'link_class' => 'resources',
874 Log (undef, "Failed to register output manifest: $@");
878 Log (undef, "finish");
885 sub update_progress_stats
887 $progress_stats_updated = time;
888 return if !$progress_is_dirty;
889 my ($todo, $done, $running) = (scalar @jobstep_todo,
890 scalar @jobstep_done,
891 scalar @slot - scalar @freeslot - scalar @holdslot);
892 $Job->{'tasks_summary'} ||= {};
893 $Job->{'tasks_summary'}->{'todo'} = $todo;
894 $Job->{'tasks_summary'}->{'done'} = $done;
895 $Job->{'tasks_summary'}->{'running'} = $running;
897 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
899 Log (undef, "status: $done done, $running running, $todo todo");
900 $progress_is_dirty = 0;
907 my $pid = waitpid (-1, WNOHANG);
908 return 0 if $pid <= 0;
910 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
912 . $slot[$proc{$pid}->{slot}]->{cpu});
913 my $jobstepid = $proc{$pid}->{jobstep};
914 my $elapsed = time - $proc{$pid}->{time};
915 my $Jobstep = $jobstep[$jobstepid];
917 my $childstatus = $?;
918 my $exitvalue = $childstatus >> 8;
919 my $exitinfo = sprintf("exit %d signal %d%s",
922 ($childstatus & 128 ? ' core dump' : ''));
923 $Jobstep->{'arvados_task'}->reload;
924 my $task_success = $Jobstep->{'arvados_task'}->{success};
926 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
928 if (!defined $task_success) {
929 # task did not indicate one way or the other --> fail
930 $Jobstep->{'arvados_task'}->{success} = 0;
931 $Jobstep->{'arvados_task'}->save;
938 $temporary_fail ||= $Jobstep->{node_fail};
939 $temporary_fail ||= ($exitvalue == 111);
942 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
944 # Check for signs of a failed or misconfigured node
945 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
946 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
947 # Don't count this against jobstep failure thresholds if this
948 # node is already suspected faulty and srun exited quickly
949 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
951 Log ($jobstepid, "blaming failure on suspect node " .
952 $slot[$proc{$pid}->{slot}]->{node}->{name});
953 $temporary_fail ||= 1;
955 ban_node_by_slot($proc{$pid}->{slot});
958 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
959 ++$Jobstep->{'failures'},
960 $temporary_fail ? 'temporary ' : 'permanent',
963 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
964 # Give up on this task, and the whole job
966 $main::please_freeze = 1;
969 # Put this task back on the todo queue
970 push @jobstep_todo, $jobstepid;
972 $Job->{'tasks_summary'}->{'failed'}++;
976 ++$thisround_succeeded;
977 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
978 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
979 push @jobstep_done, $jobstepid;
980 Log ($jobstepid, "success in $elapsed seconds");
982 $Jobstep->{exitcode} = $childstatus;
983 $Jobstep->{finishtime} = time;
984 process_stderr ($jobstepid, $task_success);
985 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
987 close $reader{$jobstepid};
988 delete $reader{$jobstepid};
989 delete $slot[$proc{$pid}->{slot}]->{pid};
990 push @freeslot, $proc{$pid}->{slot};
995 my $newtask_list = [];
998 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1000 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1002 'order' => 'qsequence',
1003 'offset' => scalar(@$newtask_list),
1005 push(@$newtask_list, @{$newtask_results->{items}});
1006 } while (@{$newtask_results->{items}});
1007 foreach my $arvados_task (@$newtask_list) {
1009 'level' => $arvados_task->{'sequence'},
1011 'arvados_task' => $arvados_task
1013 push @jobstep, $jobstep;
1014 push @jobstep_todo, $#jobstep;
1018 $progress_is_dirty = 1;
1022 sub check_refresh_wanted
1024 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1025 if (@stat && $stat[9] > $latest_refresh) {
1026 $latest_refresh = scalar time;
1027 if ($job_has_uuid) {
1028 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1029 for my $attr ('cancelled_at',
1030 'cancelled_by_user_uuid',
1031 'cancelled_by_client_uuid') {
1032 $Job->{$attr} = $Job2->{$attr};
1034 if ($Job->{'cancelled_at'}) {
1035 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1036 " by user " . $Job->{cancelled_by_user_uuid});
1038 $main::please_freeze = 1;
1046 # return if the kill list was checked <4 seconds ago
1047 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1051 $squeue_kill_checked = time;
1053 # use killem() on procs whose killtime is reached
1056 if (exists $proc{$_}->{killtime}
1057 && $proc{$_}->{killtime} <= time)
1063 # return if the squeue was checked <60 seconds ago
1064 if (defined $squeue_checked && $squeue_checked > time - 60)
1068 $squeue_checked = time;
1072 # here is an opportunity to check for mysterious problems with local procs
1076 # get a list of steps still running
1077 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1079 if ($squeue[-1] ne "ok")
1085 # which of my jobsteps are running, according to squeue?
1089 if (/^(\d+)\.(\d+) (\S+)/)
1091 if ($1 eq $ENV{SLURM_JOBID})
1098 # which of my active child procs (>60s old) were not mentioned by squeue?
1099 foreach (keys %proc)
1101 if ($proc{$_}->{time} < time - 60
1102 && !exists $ok{$proc{$_}->{jobstepname}}
1103 && !exists $proc{$_}->{killtime})
1105 # kill this proc if it hasn't exited in 30 seconds
1106 $proc{$_}->{killtime} = time + 30;
1112 sub release_allocation
1116 Log (undef, "release job allocation");
1117 system "scancel $ENV{SLURM_JOBID}";
1125 foreach my $job (keys %reader)
1128 while (0 < sysread ($reader{$job}, $buf, 8192))
1130 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1131 $jobstep[$job]->{stderr} .= $buf;
1132 preprocess_stderr ($job);
1133 if (length ($jobstep[$job]->{stderr}) > 16384)
1135 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1144 sub preprocess_stderr
1148 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1150 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1151 Log ($job, "stderr $line");
1152 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1154 $main::please_freeze = 1;
1156 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1157 $jobstep[$job]->{node_fail} = 1;
1158 ban_node_by_slot($jobstep[$job]->{slotindex});
1167 my $task_success = shift;
1168 preprocess_stderr ($job);
1171 Log ($job, "stderr $_");
1172 } split ("\n", $jobstep[$job]->{stderr});
1178 my ($keep, $child_out, $output_block);
1180 my $cmd = "arv-get \Q$hash\E";
1181 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1185 my $bytes = sysread($keep, $buf, 1024 * 1024);
1186 if (!defined $bytes) {
1187 die "reading from arv-get: $!";
1188 } elsif ($bytes == 0) {
1189 # sysread returns 0 at the end of the pipe.
1192 # some bytes were read into buf.
1193 $output_block .= $buf;
1197 return $output_block;
1202 Log (undef, "collate");
1204 my ($child_out, $child_in);
1205 my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1209 next if (!exists $_->{'arvados_task'}->{output} ||
1210 !$_->{'arvados_task'}->{'success'} ||
1211 $_->{'exitcode'} != 0);
1212 my $output = $_->{'arvados_task'}->{output};
1213 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1215 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1216 print $child_in $output;
1218 elsif (@jobstep == 1)
1220 $joboutput = $output;
1223 elsif (defined (my $outblock = fetch_block ($output)))
1225 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1226 print $child_in $outblock;
1230 Log (undef, "XXX fetch_block($output) failed XXX");
1236 if (!defined $joboutput) {
1237 my $s = IO::Select->new($child_out);
1238 if ($s->can_read(120)) {
1239 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1242 Log (undef, "timed out reading from 'arv-put'");
1249 Log (undef, "output $joboutput");
1250 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1254 Log (undef, "output undef");
1264 my $sig = 2; # SIGINT first
1265 if (exists $proc{$_}->{"sent_$sig"} &&
1266 time - $proc{$_}->{"sent_$sig"} > 4)
1268 $sig = 15; # SIGTERM if SIGINT doesn't work
1270 if (exists $proc{$_}->{"sent_$sig"} &&
1271 time - $proc{$_}->{"sent_$sig"} > 4)
1273 $sig = 9; # SIGKILL if SIGTERM doesn't work
1275 if (!exists $proc{$_}->{"sent_$sig"})
1277 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1279 select (undef, undef, undef, 0.1);
1282 kill $sig, $_; # srun wants two SIGINT to really interrupt
1284 $proc{$_}->{"sent_$sig"} = time;
1285 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1295 vec($bits,fileno($_),1) = 1;
1301 sub Log # ($jobstep_id, $logmessage)
1303 if ($_[1] =~ /\n/) {
1304 for my $line (split (/\n/, $_[1])) {
1309 my $fh = select STDERR; $|=1; select $fh;
1310 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1311 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1314 if ($local_logfile || -t STDERR) {
1315 my @gmtime = gmtime;
1316 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1317 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1319 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1321 if ($local_logfile) {
1322 print $local_logfile $datetime . " " . $message;
1329 my ($package, $file, $line) = caller;
1330 my $message = "@_ at $file line $line\n";
1331 Log (undef, $message);
1332 freeze() if @jobstep_todo;
1333 collate_output() if @jobstep_todo;
1335 save_meta() if $local_logfile;
1342 return if !$job_has_uuid;
1343 $Job->update_attributes('running' => 0,
1345 'finished_at' => scalar gmtime);
1351 my $justcheckpoint = shift; # false if this will be the last meta saved
1352 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1354 $local_logfile->flush;
1355 my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1356 . quotemeta($local_logfile->filename);
1357 my $loglocator = `$cmd`;
1358 die "system $cmd failed: $?" if $?;
1361 $local_logfile = undef; # the temp file is automatically deleted
1362 Log (undef, "log manifest is $loglocator");
1363 $Job->{'log'} = $loglocator;
1364 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1368 sub freeze_if_want_freeze
1370 if ($main::please_freeze)
1372 release_allocation();
1375 # kill some srun procs before freeze+stop
1376 map { $proc{$_} = {} } @_;
1379 killem (keys %proc);
1380 select (undef, undef, undef, 0.1);
1382 while (($died = waitpid (-1, WNOHANG)) > 0)
1384 delete $proc{$died};
1399 Log (undef, "Freeze not implemented");
1406 croak ("Thaw not implemented");
1422 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1429 my $srunargs = shift;
1430 my $execargs = shift;
1431 my $opts = shift || {};
1433 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1434 print STDERR (join (" ",
1435 map { / / ? "'$_'" : $_ }
1438 if $ENV{CRUNCH_DEBUG};
1440 if (defined $stdin) {
1441 my $child = open STDIN, "-|";
1442 defined $child or die "no fork: $!";
1444 print $stdin or die $!;
1445 close STDOUT or die $!;
1450 return system (@$args) if $opts->{fork};
1453 warn "ENV size is ".length(join(" ",%ENV));
1454 die "exec failed: $!: @$args";
1458 sub ban_node_by_slot {
1459 # Don't start any new jobsteps on this node for 60 seconds
1461 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1462 $slot[$slotid]->{node}->{hold_count}++;
1463 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1468 my ($lockfile, $error_message) = @_;
1469 open L, ">", $lockfile or croak("$lockfile: $!");
1470 if (!flock L, LOCK_EX|LOCK_NB) {
1471 croak("Can't lock $lockfile: $error_message\n");
1475 sub find_docker_hash {
1476 # Given a Keep locator, search for a matching link to find the Docker hash
1477 # of the stored image.
1478 my $locator = shift;
1479 my $links_result = $arv->{links}->{list}->execute(
1480 filters => [["head_uuid", "=", $locator],
1481 ["link_class", "=", "docker_image_hash"]],
1484 foreach my $link (@{$links_result->{items}}) {
1485 $docker_hash = lc($link->{name});
1487 return $docker_hash;
1493 # checkout-and-build
1497 my $destdir = $ENV{"CRUNCH_SRC"};
1498 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1499 my $repo = $ENV{"CRUNCH_SRC_URL"};
1501 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1503 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1507 unlink "$destdir.commit";
1508 open STDOUT, ">", "$destdir.log";
1509 open STDERR, ">&STDOUT";
1512 my @git_archive_data = <DATA>;
1513 if (@git_archive_data) {
1514 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1515 print TARX @git_archive_data;
1517 die "'tar -C $destdir -xf -' exited $?: $!";
1522 chomp ($pwd = `pwd`);
1523 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1526 for my $src_path ("$destdir/arvados/sdk/python") {
1528 shell_or_die ("virtualenv", $install_dir);
1529 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1533 if (-e "$destdir/crunch_scripts/install") {
1534 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1535 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1537 shell_or_die ("./tests/autotests.sh", $install_dir);
1538 } elsif (-e "./install.sh") {
1539 shell_or_die ("./install.sh", $install_dir);
1543 unlink "$destdir.commit.new";
1544 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1545 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1554 if ($ENV{"DEBUG"}) {
1555 print STDERR "@_\n";
1558 or die "@_ failed: $! exit 0x".sprintf("%x",$?);