2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89 if ($ENV{"USER"} ne "crunch" && $< != 0) {
90 # use a tmp dir unique for my uid
91 $ENV{"CRUNCH_TMP"} .= "-$<";
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
101 if (defined $ENV{"ARV_CLI"}) {
102 $arv_cli = $ENV{"ARV_CLI"};
114 GetOptions('force-unlock' => \$force_unlock,
115 'git-dir=s' => \$git_dir,
116 'job=s' => \$jobspec,
117 'job-api-token=s' => \$job_api_token,
118 'no-clear-tmp' => \$no_clear_tmp,
119 'resume-stash=s' => \$resume_stash,
122 if (defined $job_api_token) {
123 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 if ($Job->{'is_locked_by_uuid'}) {
156 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
158 if ($Job->{'success'} ne undef) {
159 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
161 if ($Job->{'running'}) {
162 croak("Job 'running' flag is already set");
164 if ($Job->{'started_at'}) {
165 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
171 $Job = JSON::decode_json($jobspec);
175 map { croak ("No $_ specified") unless $Job->{$_} }
176 qw(script script_version script_parameters);
179 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180 $Job->{'started_at'} = gmtime;
182 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
186 $job_id = $Job->{'uuid'};
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
196 Log (undef, "check slurm allocation");
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
203 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204 push @sinfo, "$localcpus localhost";
206 if (exists $ENV{SLURM_NODELIST})
208 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
212 my ($ncpus, $slurm_nodelist) = split;
213 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
216 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
219 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
222 foreach (split (",", $ranges))
235 push @nodelist, map {
237 $n =~ s/\[[-,\d]+\]/$_/;
244 push @nodelist, $nodelist;
247 foreach my $nodename (@nodelist)
249 Log (undef, "node $nodename - $ncpus slots");
250 my $node = { name => $nodename,
254 foreach my $cpu (1..$ncpus)
256 push @slot, { node => $node,
260 push @node, @nodelist;
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
275 # Claim this job, and make sure nobody else does
276 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278 croak("Error while updating / locking job");
280 $Job->update_attributes('started_at' => scalar gmtime,
283 'tasks_summary' => { 'failed' => 0,
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
322 if (defined $Job->{thawedfromkey})
324 thaw ($Job->{thawedfromkey});
328 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329 'job_uuid' => $Job->{'uuid'},
334 push @jobstep, { 'level' => 0,
336 'arvados_task' => $first_task,
338 push @jobstep_todo, 0;
344 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
356 if (!defined $no_clear_tmp) {
357 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358 system($clear_tmp_cmd) == 0
359 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
361 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
364 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
368 or croak ("setup.py in $src_path failed: exit ".($?>>8));
376 $build_script = <DATA>;
378 Log (undef, "Install revision ".$Job->{script_version});
379 my $nodelist = join(",", @node);
381 if (!defined $no_clear_tmp) {
382 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
384 my $cleanpid = fork();
387 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
393 last if $cleanpid == waitpid (-1, WNOHANG);
394 freeze_if_want_freeze ($cleanpid);
395 select (undef, undef, undef, 0.1);
397 Log (undef, "Clean-work-dir exited $?");
400 # Install requested code version
403 my @srunargs = ("srun",
404 "--nodelist=$nodelist",
405 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
407 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
412 my $treeish = $Job->{'script_version'};
414 # If we're running under crunch-dispatch, it will have pulled the
415 # appropriate source tree into its own repository, and given us that
416 # repo's path as $git_dir. If we're running a "local" job, and a
417 # script_version was specified, it's up to the user to provide the
418 # full path to a local repository in Job->{repository}.
420 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421 # git-archive --remote where appropriate.
423 # TODO: Accept a locally-hosted Arvados repository by name or
424 # UUID. Use arvados.v1.repositories.list or .get to figure out the
425 # appropriate fetch-url.
426 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
428 $ENV{"CRUNCH_SRC_URL"} = $repo;
430 if (-d "$repo/.git") {
431 # We were given a working directory, but we are only interested in
433 $repo = "$repo/.git";
436 # If this looks like a subversion r#, look for it in git-svn commit messages
438 if ($treeish =~ m{^\d{1,4}$}) {
439 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
441 if ($gitlog =~ /^[a-f0-9]{40}$/) {
443 Log (undef, "Using commit $commit for script_version $treeish");
447 # If that didn't work, try asking git to look it up as a tree-ish.
449 if (!defined $commit) {
450 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
452 if ($found =~ /^[0-9a-f]{40}$/s) {
454 if ($commit ne $treeish) {
455 # Make sure we record the real commit id in the database,
456 # frozentokey, logs, etc. -- instead of an abbreviation or a
457 # branch name which can become ambiguous or point to a
458 # different commit in the future.
459 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460 Log (undef, "Using commit $commit for tree-ish $treeish");
461 if ($commit ne $treeish) {
462 $Job->{'script_version'} = $commit;
464 $Job->update_attributes('script_version' => $commit) or
465 croak("Error while updating job");
471 if (defined $commit) {
472 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473 @execargs = ("sh", "-c",
474 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
478 croak ("could not figure out commit id for $treeish");
481 my $installpid = fork();
482 if ($installpid == 0)
484 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
489 last if $installpid == waitpid (-1, WNOHANG);
490 freeze_if_want_freeze ($installpid);
491 select (undef, undef, undef, 0.1);
493 Log (undef, "Install exited $?");
498 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506 $docker_hash = find_docker_hash($docker_locator);
509 croak("No Docker image hash found from locator $docker_locator");
511 my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513 arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
516 my $docker_pid = fork();
517 if ($docker_pid == 0)
519 srun (["srun", "--nodelist=" . join(',', @node)],
520 ["/bin/sh", "-ec", $docker_install_script]);
525 last if $docker_pid == waitpid (-1, WNOHANG);
526 freeze_if_want_freeze ($docker_pid);
527 select (undef, undef, undef, 0.1);
531 croak("Installing Docker image from $docker_locator returned exit code $?");
535 foreach (qw (script script_version script_parameters runtime_constraints))
539 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
541 foreach (split (/\n/, $Job->{knobs}))
543 Log (undef, "knob " . $_);
548 $main::success = undef;
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559 or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
566 my @freeslot = (0..$#slot);
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
572 update_progress_stats();
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
579 my $id = $jobstep_todo[$todo_ptr];
580 my $Jobstep = $jobstep[$id];
581 if ($Jobstep->{level} != $level)
586 pipe $reader{$id}, "writer" or croak ($!);
587 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
590 my $childslot = $freeslot[0];
591 my $childnode = $slot[$childslot]->{node};
592 my $childslotname = join (".",
593 $slot[$childslot]->{node}->{name},
594 $slot[$childslot]->{cpu});
595 my $childpid = fork();
598 $SIG{'INT'} = 'DEFAULT';
599 $SIG{'QUIT'} = 'DEFAULT';
600 $SIG{'TERM'} = 'DEFAULT';
602 foreach (values (%reader))
606 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607 open(STDOUT,">&writer");
608 open(STDERR,">&writer");
613 delete $ENV{"GNUPGHOME"};
614 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615 $ENV{"TASK_QSEQUENCE"} = $id;
616 $ENV{"TASK_SEQUENCE"} = $level;
617 $ENV{"JOB_SCRIPT"} = $Job->{script};
618 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619 $param =~ tr/a-z/A-Z/;
620 $ENV{"JOB_PARAMETER_$param"} = $value;
622 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625 $ENV{"HOME"} = $ENV{"TASK_WORK"}.".home";
626 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
627 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
628 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
629 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
635 "--nodelist=".$childnode->{name},
636 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
637 "--job-name=$job_id.$id.$$",
639 my $build_script_to_send = "";
641 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
642 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} ${HOME}"
643 ."&& chmod og+wrx $ENV{TASK_WORK}"
644 ."&& cd $ENV{CRUNCH_TMP} ";
647 $build_script_to_send = $build_script;
651 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
654 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
655 $command .= "$docker_bin run --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
656 # Dynamically configure the container to use the host system as its
657 # DNS server. Get the host's global addresses from the ip command,
658 # and turn them into docker --dns options using gawk.
660 q{$(ip -o address show scope global |
661 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
662 $command .= "--volume=\Q$ENV{TASK_WORK}:/tmp/crunch-job:rw\E ";
663 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
664 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
665 $command .= "--env=\QHOME=/home/crunch\E ";
666 while (my ($env_key, $env_val) = each %ENV)
668 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
669 if ($env_key eq "TASK_WORK") {
670 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
672 elsif ($env_key eq "TASK_KEEPMOUNT") {
673 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
675 elsif ($env_key eq "CRUNCH_SRC") {
676 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
679 $command .= "--env=\Q$env_key=$env_val\E ";
683 $command .= "\Q$docker_hash\E ";
684 $command .= "stdbuf --output=0 --error=0 ";
685 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
688 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
689 $command .= "stdbuf --output=0 --error=0 ";
690 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
693 my @execargs = ('bash', '-c', $command);
694 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
698 if (!defined $childpid)
705 $proc{$childpid} = { jobstep => $id,
708 jobstepname => "$job_id.$id.$childpid",
710 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
711 $slot[$childslot]->{pid} = $childpid;
713 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
714 Log ($id, "child $childpid started on $childslotname");
715 $Jobstep->{starttime} = time;
716 $Jobstep->{node} = $childnode->{name};
717 $Jobstep->{slotindex} = $childslot;
718 delete $Jobstep->{stderr};
719 delete $Jobstep->{finishtime};
721 splice @jobstep_todo, $todo_ptr, 1;
724 $progress_is_dirty = 1;
728 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
730 last THISROUND if $main::please_freeze;
731 if ($main::please_info)
733 $main::please_info = 0;
737 update_progress_stats();
744 check_refresh_wanted();
746 update_progress_stats();
747 select (undef, undef, undef, 0.1);
749 elsif (time - $progress_stats_updated >= 30)
751 update_progress_stats();
753 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
754 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
756 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
757 .($thisround_failed+$thisround_succeeded)
758 .") -- giving up on this round";
759 Log (undef, $message);
763 # move slots from freeslot to holdslot (or back to freeslot) if necessary
764 for (my $i=$#freeslot; $i>=0; $i--) {
765 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
766 push @holdslot, (splice @freeslot, $i, 1);
769 for (my $i=$#holdslot; $i>=0; $i--) {
770 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
771 push @freeslot, (splice @holdslot, $i, 1);
775 # give up if no nodes are succeeding
776 if (!grep { $_->{node}->{losing_streak} == 0 &&
777 $_->{node}->{hold_count} < 4 } @slot) {
778 my $message = "Every node has failed -- giving up on this round";
779 Log (undef, $message);
786 push @freeslot, splice @holdslot;
787 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
790 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
793 if ($main::please_continue) {
794 $main::please_continue = 0;
797 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
801 check_refresh_wanted();
803 update_progress_stats();
804 select (undef, undef, undef, 0.1);
805 killem (keys %proc) if $main::please_freeze;
809 update_progress_stats();
810 freeze_if_want_freeze();
813 if (!defined $main::success)
816 $thisround_succeeded == 0 &&
817 ($thisround_failed == 0 || $thisround_failed > 4))
819 my $message = "stop because $thisround_failed tasks failed and none succeeded";
820 Log (undef, $message);
829 goto ONELEVEL if !defined $main::success;
832 release_allocation();
834 my $collated_output = &collate_output();
837 $Job->update_attributes('running' => 0,
838 'success' => $collated_output && $main::success,
839 'finished_at' => scalar gmtime)
842 if ($collated_output)
845 open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
846 or die "failed to get collated manifest: $!";
847 # Read the original manifest, and strip permission hints from it,
848 # so we can put the result in a Collection.
849 my @stripped_manifest_lines = ();
850 my $orig_manifest_text = '';
851 while (my $manifest_line = <$orig_manifest>) {
852 $orig_manifest_text .= $manifest_line;
853 my @words = split(/ /, $manifest_line, -1);
854 foreach my $ii (0..$#words) {
855 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
856 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
859 push(@stripped_manifest_lines, join(" ", @words));
861 my $stripped_manifest_text = join("", @stripped_manifest_lines);
862 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
863 'uuid' => md5_hex($stripped_manifest_text),
864 'manifest_text' => $orig_manifest_text,
866 $Job->update_attributes('output' => $output->{uuid});
867 if ($Job->{'output_is_persistent'}) {
868 $arv->{'links'}->{'create'}->execute('link' => {
869 'tail_kind' => 'arvados#user',
870 'tail_uuid' => $User->{'uuid'},
871 'head_kind' => 'arvados#collection',
872 'head_uuid' => $Job->{'output'},
873 'link_class' => 'resources',
879 Log (undef, "Failed to register output manifest: $@");
883 Log (undef, "finish");
890 sub update_progress_stats
892 $progress_stats_updated = time;
893 return if !$progress_is_dirty;
894 my ($todo, $done, $running) = (scalar @jobstep_todo,
895 scalar @jobstep_done,
896 scalar @slot - scalar @freeslot - scalar @holdslot);
897 $Job->{'tasks_summary'} ||= {};
898 $Job->{'tasks_summary'}->{'todo'} = $todo;
899 $Job->{'tasks_summary'}->{'done'} = $done;
900 $Job->{'tasks_summary'}->{'running'} = $running;
902 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
904 Log (undef, "status: $done done, $running running, $todo todo");
905 $progress_is_dirty = 0;
912 my $pid = waitpid (-1, WNOHANG);
913 return 0 if $pid <= 0;
915 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
917 . $slot[$proc{$pid}->{slot}]->{cpu});
918 my $jobstepid = $proc{$pid}->{jobstep};
919 my $elapsed = time - $proc{$pid}->{time};
920 my $Jobstep = $jobstep[$jobstepid];
922 my $childstatus = $?;
923 my $exitvalue = $childstatus >> 8;
924 my $exitinfo = sprintf("exit %d signal %d%s",
927 ($childstatus & 128 ? ' core dump' : ''));
928 $Jobstep->{'arvados_task'}->reload;
929 my $task_success = $Jobstep->{'arvados_task'}->{success};
931 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
933 if (!defined $task_success) {
934 # task did not indicate one way or the other --> fail
935 $Jobstep->{'arvados_task'}->{success} = 0;
936 $Jobstep->{'arvados_task'}->save;
943 $temporary_fail ||= $Jobstep->{node_fail};
944 $temporary_fail ||= ($exitvalue == 111);
947 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
949 # Check for signs of a failed or misconfigured node
950 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
951 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
952 # Don't count this against jobstep failure thresholds if this
953 # node is already suspected faulty and srun exited quickly
954 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
956 Log ($jobstepid, "blaming failure on suspect node " .
957 $slot[$proc{$pid}->{slot}]->{node}->{name});
958 $temporary_fail ||= 1;
960 ban_node_by_slot($proc{$pid}->{slot});
963 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
964 ++$Jobstep->{'failures'},
965 $temporary_fail ? 'temporary ' : 'permanent',
968 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
969 # Give up on this task, and the whole job
971 $main::please_freeze = 1;
974 # Put this task back on the todo queue
975 push @jobstep_todo, $jobstepid;
977 $Job->{'tasks_summary'}->{'failed'}++;
981 ++$thisround_succeeded;
982 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
983 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
984 push @jobstep_done, $jobstepid;
985 Log ($jobstepid, "success in $elapsed seconds");
987 $Jobstep->{exitcode} = $childstatus;
988 $Jobstep->{finishtime} = time;
989 process_stderr ($jobstepid, $task_success);
990 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
992 close $reader{$jobstepid};
993 delete $reader{$jobstepid};
994 delete $slot[$proc{$pid}->{slot}]->{pid};
995 push @freeslot, $proc{$pid}->{slot};
999 my $newtask_list = [];
1000 my $newtask_results;
1002 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1004 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1006 'order' => 'qsequence',
1007 'offset' => scalar(@$newtask_list),
1009 push(@$newtask_list, @{$newtask_results->{items}});
1010 } while (@{$newtask_results->{items}});
1011 foreach my $arvados_task (@$newtask_list) {
1013 'level' => $arvados_task->{'sequence'},
1015 'arvados_task' => $arvados_task
1017 push @jobstep, $jobstep;
1018 push @jobstep_todo, $#jobstep;
1021 $progress_is_dirty = 1;
1025 sub check_refresh_wanted
1027 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1028 if (@stat && $stat[9] > $latest_refresh) {
1029 $latest_refresh = scalar time;
1030 if ($job_has_uuid) {
1031 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1032 for my $attr ('cancelled_at',
1033 'cancelled_by_user_uuid',
1034 'cancelled_by_client_uuid') {
1035 $Job->{$attr} = $Job2->{$attr};
1037 if ($Job->{'cancelled_at'}) {
1038 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1039 " by user " . $Job->{cancelled_by_user_uuid});
1041 $main::please_freeze = 1;
1049 # return if the kill list was checked <4 seconds ago
1050 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1054 $squeue_kill_checked = time;
1056 # use killem() on procs whose killtime is reached
1059 if (exists $proc{$_}->{killtime}
1060 && $proc{$_}->{killtime} <= time)
1066 # return if the squeue was checked <60 seconds ago
1067 if (defined $squeue_checked && $squeue_checked > time - 60)
1071 $squeue_checked = time;
1075 # here is an opportunity to check for mysterious problems with local procs
1079 # get a list of steps still running
1080 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1082 if ($squeue[-1] ne "ok")
1088 # which of my jobsteps are running, according to squeue?
1092 if (/^(\d+)\.(\d+) (\S+)/)
1094 if ($1 eq $ENV{SLURM_JOBID})
1101 # which of my active child procs (>60s old) were not mentioned by squeue?
1102 foreach (keys %proc)
1104 if ($proc{$_}->{time} < time - 60
1105 && !exists $ok{$proc{$_}->{jobstepname}}
1106 && !exists $proc{$_}->{killtime})
1108 # kill this proc if it hasn't exited in 30 seconds
1109 $proc{$_}->{killtime} = time + 30;
1115 sub release_allocation
1119 Log (undef, "release job allocation");
1120 system "scancel $ENV{SLURM_JOBID}";
1128 foreach my $job (keys %reader)
1131 while (0 < sysread ($reader{$job}, $buf, 8192))
1133 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1134 $jobstep[$job]->{stderr} .= $buf;
1135 preprocess_stderr ($job);
1136 if (length ($jobstep[$job]->{stderr}) > 16384)
1138 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1147 sub preprocess_stderr
1151 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1153 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1154 Log ($job, "stderr $line");
1155 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1157 $main::please_freeze = 1;
1159 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1160 $jobstep[$job]->{node_fail} = 1;
1161 ban_node_by_slot($jobstep[$job]->{slotindex});
1170 my $task_success = shift;
1171 preprocess_stderr ($job);
1174 Log ($job, "stderr $_");
1175 } split ("\n", $jobstep[$job]->{stderr});
1181 my ($keep, $child_out, $output_block);
1183 my $cmd = "$arv_cli keep get \Q$hash\E";
1184 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1185 sysread($keep, $output_block, 64 * 1024 * 1024);
1187 return $output_block;
1192 Log (undef, "collate");
1194 my ($child_out, $child_in);
1195 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1199 next if (!exists $_->{'arvados_task'}->{output} ||
1200 !$_->{'arvados_task'}->{'success'} ||
1201 $_->{'exitcode'} != 0);
1202 my $output = $_->{'arvados_task'}->{output};
1203 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1205 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1206 print $child_in $output;
1208 elsif (@jobstep == 1)
1210 $joboutput = $output;
1213 elsif (defined (my $outblock = fetch_block ($output)))
1215 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1216 print $child_in $outblock;
1220 Log (undef, "XXX fetch_block($output) failed XXX");
1226 if (!defined $joboutput) {
1227 my $s = IO::Select->new($child_out);
1228 if ($s->can_read(120)) {
1229 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1232 Log (undef, "timed out reading from 'arv keep put'");
1239 Log (undef, "output $joboutput");
1240 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1244 Log (undef, "output undef");
1254 my $sig = 2; # SIGINT first
1255 if (exists $proc{$_}->{"sent_$sig"} &&
1256 time - $proc{$_}->{"sent_$sig"} > 4)
1258 $sig = 15; # SIGTERM if SIGINT doesn't work
1260 if (exists $proc{$_}->{"sent_$sig"} &&
1261 time - $proc{$_}->{"sent_$sig"} > 4)
1263 $sig = 9; # SIGKILL if SIGTERM doesn't work
1265 if (!exists $proc{$_}->{"sent_$sig"})
1267 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1269 select (undef, undef, undef, 0.1);
1272 kill $sig, $_; # srun wants two SIGINT to really interrupt
1274 $proc{$_}->{"sent_$sig"} = time;
1275 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1285 vec($bits,fileno($_),1) = 1;
1291 sub Log # ($jobstep_id, $logmessage)
1293 if ($_[1] =~ /\n/) {
1294 for my $line (split (/\n/, $_[1])) {
1299 my $fh = select STDERR; $|=1; select $fh;
1300 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1301 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1304 if ($local_logfile || -t STDERR) {
1305 my @gmtime = gmtime;
1306 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1307 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1309 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1311 if ($local_logfile) {
1312 print $local_logfile $datetime . " " . $message;
1319 my ($package, $file, $line) = caller;
1320 my $message = "@_ at $file line $line\n";
1321 Log (undef, $message);
1322 freeze() if @jobstep_todo;
1323 collate_output() if @jobstep_todo;
1325 save_meta() if $local_logfile;
1332 return if !$job_has_uuid;
1333 $Job->update_attributes('running' => 0,
1335 'finished_at' => scalar gmtime);
1341 my $justcheckpoint = shift; # false if this will be the last meta saved
1342 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1344 $local_logfile->flush;
1345 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1346 . quotemeta($local_logfile->filename);
1347 my $loglocator = `$cmd`;
1348 die "system $cmd failed: $?" if $?;
1351 $local_logfile = undef; # the temp file is automatically deleted
1352 Log (undef, "log manifest is $loglocator");
1353 $Job->{'log'} = $loglocator;
1354 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1358 sub freeze_if_want_freeze
1360 if ($main::please_freeze)
1362 release_allocation();
1365 # kill some srun procs before freeze+stop
1366 map { $proc{$_} = {} } @_;
1369 killem (keys %proc);
1370 select (undef, undef, undef, 0.1);
1372 while (($died = waitpid (-1, WNOHANG)) > 0)
1374 delete $proc{$died};
1389 Log (undef, "Freeze not implemented");
1396 croak ("Thaw not implemented");
1412 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1419 my $srunargs = shift;
1420 my $execargs = shift;
1421 my $opts = shift || {};
1423 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1424 print STDERR (join (" ",
1425 map { / / ? "'$_'" : $_ }
1428 if $ENV{CRUNCH_DEBUG};
1430 if (defined $stdin) {
1431 my $child = open STDIN, "-|";
1432 defined $child or die "no fork: $!";
1434 print $stdin or die $!;
1435 close STDOUT or die $!;
1440 return system (@$args) if $opts->{fork};
1443 warn "ENV size is ".length(join(" ",%ENV));
1444 die "exec failed: $!: @$args";
1448 sub ban_node_by_slot {
1449 # Don't start any new jobsteps on this node for 60 seconds
1451 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1452 $slot[$slotid]->{node}->{hold_count}++;
1453 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1458 my ($lockfile, $error_message) = @_;
1459 open L, ">", $lockfile or croak("$lockfile: $!");
1460 if (!flock L, LOCK_EX|LOCK_NB) {
1461 croak("Can't lock $lockfile: $error_message\n");
1465 sub find_docker_hash {
1466 # Given a Keep locator, search for a matching link to find the Docker hash
1467 # of the stored image.
1468 my $locator = shift;
1469 my $links_result = $arv->{links}->{list}->execute(
1470 filters => [["head_uuid", "=", $locator],
1471 ["link_class", "=", "docker_image_hash"]],
1474 foreach my $link (@{$links_result->{items}}) {
1475 $docker_hash = lc($link->{name});
1477 return $docker_hash;
1483 # checkout-and-build
1487 my $destdir = $ENV{"CRUNCH_SRC"};
1488 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1489 my $repo = $ENV{"CRUNCH_SRC_URL"};
1491 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1493 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1497 unlink "$destdir.commit";
1498 open STDOUT, ">", "$destdir.log";
1499 open STDERR, ">&STDOUT";
1502 my @git_archive_data = <DATA>;
1503 if (@git_archive_data) {
1504 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1505 print TARX @git_archive_data;
1507 die "'tar -C $destdir -xf -' exited $?: $!";
1512 chomp ($pwd = `pwd`);
1513 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1516 for my $src_path ("$destdir/arvados/sdk/python") {
1518 shell_or_die ("virtualenv", $install_dir);
1519 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1523 if (-e "$destdir/crunch_scripts/install") {
1524 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1525 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1527 shell_or_die ("./tests/autotests.sh", $install_dir);
1528 } elsif (-e "./install.sh") {
1529 shell_or_die ("./install.sh", $install_dir);
1533 unlink "$destdir.commit.new";
1534 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1535 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1544 if ($ENV{"DEBUG"}) {
1545 print STDERR "@_\n";
1548 or die "@_ failed: $! exit 0x".sprintf("%x",$?);