2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89 if ($ENV{"USER"} ne "crunch" && $< != 0) {
90 # use a tmp dir unique for my uid
91 $ENV{"CRUNCH_TMP"} .= "-$<";
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
101 if (defined $ENV{"ARV_CLI"}) {
102 $arv_cli = $ENV{"ARV_CLI"};
114 GetOptions('force-unlock' => \$force_unlock,
115 'git-dir=s' => \$git_dir,
116 'job=s' => \$jobspec,
117 'job-api-token=s' => \$job_api_token,
118 'no-clear-tmp' => \$no_clear_tmp,
119 'resume-stash=s' => \$resume_stash,
122 if (defined $job_api_token) {
123 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 if ($Job->{'is_locked_by_uuid'}) {
156 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
158 if ($Job->{'success'} ne undef) {
159 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
161 if ($Job->{'running'}) {
162 croak("Job 'running' flag is already set");
164 if ($Job->{'started_at'}) {
165 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
171 $Job = JSON::decode_json($jobspec);
175 map { croak ("No $_ specified") unless $Job->{$_} }
176 qw(script script_version script_parameters);
179 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180 $Job->{'started_at'} = gmtime;
182 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
186 $job_id = $Job->{'uuid'};
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
196 Log (undef, "check slurm allocation");
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
203 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204 push @sinfo, "$localcpus localhost";
206 if (exists $ENV{SLURM_NODELIST})
208 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
212 my ($ncpus, $slurm_nodelist) = split;
213 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
216 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
219 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
222 foreach (split (",", $ranges))
235 push @nodelist, map {
237 $n =~ s/\[[-,\d]+\]/$_/;
244 push @nodelist, $nodelist;
247 foreach my $nodename (@nodelist)
249 Log (undef, "node $nodename - $ncpus slots");
250 my $node = { name => $nodename,
254 foreach my $cpu (1..$ncpus)
256 push @slot, { node => $node,
260 push @node, @nodelist;
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
275 # Claim this job, and make sure nobody else does
276 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278 croak("Error while updating / locking job");
280 $Job->update_attributes('started_at' => scalar gmtime,
283 'tasks_summary' => { 'failed' => 0,
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
322 if (defined $Job->{thawedfromkey})
324 thaw ($Job->{thawedfromkey});
328 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329 'job_uuid' => $Job->{'uuid'},
334 push @jobstep, { 'level' => 0,
336 'arvados_task' => $first_task,
338 push @jobstep_todo, 0;
344 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
356 if (!defined $no_clear_tmp) {
357 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358 system($clear_tmp_cmd) == 0
359 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
361 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
364 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
368 or croak ("setup.py in $src_path failed: exit ".($?>>8));
376 $build_script = <DATA>;
378 Log (undef, "Install revision ".$Job->{script_version});
379 my $nodelist = join(",", @node);
381 if (!defined $no_clear_tmp) {
382 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
384 my $cleanpid = fork();
387 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
393 last if $cleanpid == waitpid (-1, WNOHANG);
394 freeze_if_want_freeze ($cleanpid);
395 select (undef, undef, undef, 0.1);
397 Log (undef, "Clean-work-dir exited $?");
400 # Install requested code version
403 my @srunargs = ("srun",
404 "--nodelist=$nodelist",
405 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
407 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
412 my $treeish = $Job->{'script_version'};
414 # If we're running under crunch-dispatch, it will have pulled the
415 # appropriate source tree into its own repository, and given us that
416 # repo's path as $git_dir. If we're running a "local" job, and a
417 # script_version was specified, it's up to the user to provide the
418 # full path to a local repository in Job->{repository}.
420 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421 # git-archive --remote where appropriate.
423 # TODO: Accept a locally-hosted Arvados repository by name or
424 # UUID. Use arvados.v1.repositories.list or .get to figure out the
425 # appropriate fetch-url.
426 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
428 $ENV{"CRUNCH_SRC_URL"} = $repo;
430 if (-d "$repo/.git") {
431 # We were given a working directory, but we are only interested in
433 $repo = "$repo/.git";
436 # If this looks like a subversion r#, look for it in git-svn commit messages
438 if ($treeish =~ m{^\d{1,4}$}) {
439 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
441 if ($gitlog =~ /^[a-f0-9]{40}$/) {
443 Log (undef, "Using commit $commit for script_version $treeish");
447 # If that didn't work, try asking git to look it up as a tree-ish.
449 if (!defined $commit) {
450 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
452 if ($found =~ /^[0-9a-f]{40}$/s) {
454 if ($commit ne $treeish) {
455 # Make sure we record the real commit id in the database,
456 # frozentokey, logs, etc. -- instead of an abbreviation or a
457 # branch name which can become ambiguous or point to a
458 # different commit in the future.
459 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460 Log (undef, "Using commit $commit for tree-ish $treeish");
461 if ($commit ne $treeish) {
462 $Job->{'script_version'} = $commit;
464 $Job->update_attributes('script_version' => $commit) or
465 croak("Error while updating job");
471 if (defined $commit) {
472 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473 @execargs = ("sh", "-c",
474 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
478 croak ("could not figure out commit id for $treeish");
481 my $installpid = fork();
482 if ($installpid == 0)
484 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
489 last if $installpid == waitpid (-1, WNOHANG);
490 freeze_if_want_freeze ($installpid);
491 select (undef, undef, undef, 0.1);
493 Log (undef, "Install exited $?");
498 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
506 my $docker_pid = fork();
507 if ($docker_pid == 0)
509 srun (["srun", "--nodelist=" . join(' ', @node)],
510 [$docker_bin, 'pull', $docker_image]);
515 last if $docker_pid == waitpid (-1, WNOHANG);
516 freeze_if_want_freeze ($docker_pid);
517 select (undef, undef, undef, 0.1);
519 # If the Docker image was specified as a hash, pull will fail.
520 # Ignore that error. We'll see what happens when we try to run later.
521 if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
523 croak("Installing Docker image $docker_image returned exit code $?");
527 foreach (qw (script script_version script_parameters runtime_constraints))
531 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
533 foreach (split (/\n/, $Job->{knobs}))
535 Log (undef, "knob " . $_);
540 $main::success = undef;
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551 or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
558 my @freeslot = (0..$#slot);
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
564 update_progress_stats();
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
571 my $id = $jobstep_todo[$todo_ptr];
572 my $Jobstep = $jobstep[$id];
573 if ($Jobstep->{level} != $level)
578 pipe $reader{$id}, "writer" or croak ($!);
579 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
582 my $childslot = $freeslot[0];
583 my $childnode = $slot[$childslot]->{node};
584 my $childslotname = join (".",
585 $slot[$childslot]->{node}->{name},
586 $slot[$childslot]->{cpu});
587 my $childpid = fork();
590 $SIG{'INT'} = 'DEFAULT';
591 $SIG{'QUIT'} = 'DEFAULT';
592 $SIG{'TERM'} = 'DEFAULT';
594 foreach (values (%reader))
598 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599 open(STDOUT,">&writer");
600 open(STDERR,">&writer");
605 delete $ENV{"GNUPGHOME"};
606 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607 $ENV{"TASK_QSEQUENCE"} = $id;
608 $ENV{"TASK_SEQUENCE"} = $level;
609 $ENV{"JOB_SCRIPT"} = $Job->{script};
610 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611 $param =~ tr/a-z/A-Z/;
612 $ENV{"JOB_PARAMETER_$param"} = $value;
614 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
626 "--nodelist=".$childnode->{name},
627 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628 "--job-name=$job_id.$id.$$",
630 my $build_script_to_send = "";
632 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634 ."&& cd $ENV{CRUNCH_TMP} ";
637 $build_script_to_send = $build_script;
641 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
644 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645 $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
646 # Dynamically configure the container to use the host system as its
647 # DNS server. Get the host's global addresses from the ip command,
648 # and turn them into docker --dns options using gawk.
650 q{$(ip -o address show scope global |
651 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652 foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
654 $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
656 while (my ($env_key, $env_val) = each %ENV)
658 if ($env_key =~ /^(JOB|TASK)_/) {
659 $command .= "-e \Q$env_key=$env_val\E ";
662 $command .= "\Q$docker_image\E ";
664 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
666 $command .= "stdbuf -o0 -e0 ";
667 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
668 my @execargs = ('bash', '-c', $command);
669 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
673 if (!defined $childpid)
680 $proc{$childpid} = { jobstep => $id,
683 jobstepname => "$job_id.$id.$childpid",
685 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
686 $slot[$childslot]->{pid} = $childpid;
688 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
689 Log ($id, "child $childpid started on $childslotname");
690 $Jobstep->{starttime} = time;
691 $Jobstep->{node} = $childnode->{name};
692 $Jobstep->{slotindex} = $childslot;
693 delete $Jobstep->{stderr};
694 delete $Jobstep->{finishtime};
696 splice @jobstep_todo, $todo_ptr, 1;
699 $progress_is_dirty = 1;
703 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
705 last THISROUND if $main::please_freeze;
706 if ($main::please_info)
708 $main::please_info = 0;
712 update_progress_stats();
719 check_refresh_wanted();
721 update_progress_stats();
722 select (undef, undef, undef, 0.1);
724 elsif (time - $progress_stats_updated >= 30)
726 update_progress_stats();
728 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
729 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
731 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
732 .($thisround_failed+$thisround_succeeded)
733 .") -- giving up on this round";
734 Log (undef, $message);
738 # move slots from freeslot to holdslot (or back to freeslot) if necessary
739 for (my $i=$#freeslot; $i>=0; $i--) {
740 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
741 push @holdslot, (splice @freeslot, $i, 1);
744 for (my $i=$#holdslot; $i>=0; $i--) {
745 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
746 push @freeslot, (splice @holdslot, $i, 1);
750 # give up if no nodes are succeeding
751 if (!grep { $_->{node}->{losing_streak} == 0 &&
752 $_->{node}->{hold_count} < 4 } @slot) {
753 my $message = "Every node has failed -- giving up on this round";
754 Log (undef, $message);
761 push @freeslot, splice @holdslot;
762 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
765 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
768 if ($main::please_continue) {
769 $main::please_continue = 0;
772 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
776 check_refresh_wanted();
778 update_progress_stats();
779 select (undef, undef, undef, 0.1);
780 killem (keys %proc) if $main::please_freeze;
784 update_progress_stats();
785 freeze_if_want_freeze();
788 if (!defined $main::success)
791 $thisround_succeeded == 0 &&
792 ($thisround_failed == 0 || $thisround_failed > 4))
794 my $message = "stop because $thisround_failed tasks failed and none succeeded";
795 Log (undef, $message);
804 goto ONELEVEL if !defined $main::success;
807 release_allocation();
809 my $collated_output = &collate_output();
812 $Job->update_attributes('running' => 0,
813 'success' => $collated_output && $main::success,
814 'finished_at' => scalar gmtime)
817 if ($collated_output)
820 open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
821 or die "failed to get collated manifest: $!";
822 # Read the original manifest, and strip permission hints from it,
823 # so we can put the result in a Collection.
824 my @stripped_manifest_lines = ();
825 my $orig_manifest_text = '';
826 while (my $manifest_line = <$orig_manifest>) {
827 $orig_manifest_text .= $manifest_line;
828 my @words = split(/ /, $manifest_line, -1);
829 foreach my $ii (0..$#words) {
830 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
831 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
834 push(@stripped_manifest_lines, join(" ", @words));
836 my $stripped_manifest_text = join("", @stripped_manifest_lines);
837 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
838 'uuid' => md5_hex($stripped_manifest_text),
839 'manifest_text' => $orig_manifest_text,
841 $Job->update_attributes('output' => $output->{uuid});
842 if ($Job->{'output_is_persistent'}) {
843 $arv->{'links'}->{'create'}->execute('link' => {
844 'tail_kind' => 'arvados#user',
845 'tail_uuid' => $User->{'uuid'},
846 'head_kind' => 'arvados#collection',
847 'head_uuid' => $Job->{'output'},
848 'link_class' => 'resources',
854 Log (undef, "Failed to register output manifest: $@");
858 Log (undef, "finish");
865 sub update_progress_stats
867 $progress_stats_updated = time;
868 return if !$progress_is_dirty;
869 my ($todo, $done, $running) = (scalar @jobstep_todo,
870 scalar @jobstep_done,
871 scalar @slot - scalar @freeslot - scalar @holdslot);
872 $Job->{'tasks_summary'} ||= {};
873 $Job->{'tasks_summary'}->{'todo'} = $todo;
874 $Job->{'tasks_summary'}->{'done'} = $done;
875 $Job->{'tasks_summary'}->{'running'} = $running;
877 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
879 Log (undef, "status: $done done, $running running, $todo todo");
880 $progress_is_dirty = 0;
887 my $pid = waitpid (-1, WNOHANG);
888 return 0 if $pid <= 0;
890 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
892 . $slot[$proc{$pid}->{slot}]->{cpu});
893 my $jobstepid = $proc{$pid}->{jobstep};
894 my $elapsed = time - $proc{$pid}->{time};
895 my $Jobstep = $jobstep[$jobstepid];
897 my $childstatus = $?;
898 my $exitvalue = $childstatus >> 8;
899 my $exitinfo = sprintf("exit %d signal %d%s",
902 ($childstatus & 128 ? ' core dump' : ''));
903 $Jobstep->{'arvados_task'}->reload;
904 my $task_success = $Jobstep->{'arvados_task'}->{success};
906 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
908 if (!defined $task_success) {
909 # task did not indicate one way or the other --> fail
910 $Jobstep->{'arvados_task'}->{success} = 0;
911 $Jobstep->{'arvados_task'}->save;
918 $temporary_fail ||= $Jobstep->{node_fail};
919 $temporary_fail ||= ($exitvalue == 111);
922 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
924 # Check for signs of a failed or misconfigured node
925 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
926 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
927 # Don't count this against jobstep failure thresholds if this
928 # node is already suspected faulty and srun exited quickly
929 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
931 Log ($jobstepid, "blaming failure on suspect node " .
932 $slot[$proc{$pid}->{slot}]->{node}->{name});
933 $temporary_fail ||= 1;
935 ban_node_by_slot($proc{$pid}->{slot});
938 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
939 ++$Jobstep->{'failures'},
940 $temporary_fail ? 'temporary ' : 'permanent',
943 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
944 # Give up on this task, and the whole job
946 $main::please_freeze = 1;
949 # Put this task back on the todo queue
950 push @jobstep_todo, $jobstepid;
952 $Job->{'tasks_summary'}->{'failed'}++;
956 ++$thisround_succeeded;
957 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
958 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
959 push @jobstep_done, $jobstepid;
960 Log ($jobstepid, "success in $elapsed seconds");
962 $Jobstep->{exitcode} = $childstatus;
963 $Jobstep->{finishtime} = time;
964 process_stderr ($jobstepid, $task_success);
965 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
967 close $reader{$jobstepid};
968 delete $reader{$jobstepid};
969 delete $slot[$proc{$pid}->{slot}]->{pid};
970 push @freeslot, $proc{$pid}->{slot};
974 my $newtask_list = [];
977 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
979 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
981 'order' => 'qsequence',
982 'offset' => scalar(@$newtask_list),
984 push(@$newtask_list, @{$newtask_results->{items}});
985 } while (@{$newtask_results->{items}});
986 foreach my $arvados_task (@$newtask_list) {
988 'level' => $arvados_task->{'sequence'},
990 'arvados_task' => $arvados_task
992 push @jobstep, $jobstep;
993 push @jobstep_todo, $#jobstep;
996 $progress_is_dirty = 1;
1000 sub check_refresh_wanted
1002 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1003 if (@stat && $stat[9] > $latest_refresh) {
1004 $latest_refresh = scalar time;
1005 if ($job_has_uuid) {
1006 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1007 for my $attr ('cancelled_at',
1008 'cancelled_by_user_uuid',
1009 'cancelled_by_client_uuid') {
1010 $Job->{$attr} = $Job2->{$attr};
1012 if ($Job->{'cancelled_at'}) {
1013 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1014 " by user " . $Job->{cancelled_by_user_uuid});
1016 $main::please_freeze = 1;
1024 # return if the kill list was checked <4 seconds ago
1025 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1029 $squeue_kill_checked = time;
1031 # use killem() on procs whose killtime is reached
1034 if (exists $proc{$_}->{killtime}
1035 && $proc{$_}->{killtime} <= time)
1041 # return if the squeue was checked <60 seconds ago
1042 if (defined $squeue_checked && $squeue_checked > time - 60)
1046 $squeue_checked = time;
1050 # here is an opportunity to check for mysterious problems with local procs
1054 # get a list of steps still running
1055 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1057 if ($squeue[-1] ne "ok")
1063 # which of my jobsteps are running, according to squeue?
1067 if (/^(\d+)\.(\d+) (\S+)/)
1069 if ($1 eq $ENV{SLURM_JOBID})
1076 # which of my active child procs (>60s old) were not mentioned by squeue?
1077 foreach (keys %proc)
1079 if ($proc{$_}->{time} < time - 60
1080 && !exists $ok{$proc{$_}->{jobstepname}}
1081 && !exists $proc{$_}->{killtime})
1083 # kill this proc if it hasn't exited in 30 seconds
1084 $proc{$_}->{killtime} = time + 30;
1090 sub release_allocation
1094 Log (undef, "release job allocation");
1095 system "scancel $ENV{SLURM_JOBID}";
1103 foreach my $job (keys %reader)
1106 while (0 < sysread ($reader{$job}, $buf, 8192))
1108 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1109 $jobstep[$job]->{stderr} .= $buf;
1110 preprocess_stderr ($job);
1111 if (length ($jobstep[$job]->{stderr}) > 16384)
1113 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1122 sub preprocess_stderr
1126 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1128 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1129 Log ($job, "stderr $line");
1130 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1132 $main::please_freeze = 1;
1134 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1135 $jobstep[$job]->{node_fail} = 1;
1136 ban_node_by_slot($jobstep[$job]->{slotindex});
1145 my $task_success = shift;
1146 preprocess_stderr ($job);
1149 Log ($job, "stderr $_");
1150 } split ("\n", $jobstep[$job]->{stderr});
1156 my ($keep, $child_out, $output_block);
1158 my $cmd = "$arv_cli keep get \Q$hash\E";
1159 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1160 sysread($keep, $output_block, 64 * 1024 * 1024);
1162 return $output_block;
1167 Log (undef, "collate");
1169 my ($child_out, $child_in);
1170 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1174 next if (!exists $_->{'arvados_task'}->{output} ||
1175 !$_->{'arvados_task'}->{'success'} ||
1176 $_->{'exitcode'} != 0);
1177 my $output = $_->{'arvados_task'}->{output};
1178 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1180 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1181 print $child_in $output;
1183 elsif (@jobstep == 1)
1185 $joboutput = $output;
1188 elsif (defined (my $outblock = fetch_block ($output)))
1190 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1191 print $child_in $outblock;
1195 Log (undef, "XXX fetch_block($output) failed XXX");
1201 if (!defined $joboutput) {
1202 my $s = IO::Select->new($child_out);
1203 if ($s->can_read(120)) {
1204 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1207 Log (undef, "timed out reading from 'arv keep put'");
1214 Log (undef, "output $joboutput");
1215 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1219 Log (undef, "output undef");
1229 my $sig = 2; # SIGINT first
1230 if (exists $proc{$_}->{"sent_$sig"} &&
1231 time - $proc{$_}->{"sent_$sig"} > 4)
1233 $sig = 15; # SIGTERM if SIGINT doesn't work
1235 if (exists $proc{$_}->{"sent_$sig"} &&
1236 time - $proc{$_}->{"sent_$sig"} > 4)
1238 $sig = 9; # SIGKILL if SIGTERM doesn't work
1240 if (!exists $proc{$_}->{"sent_$sig"})
1242 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1244 select (undef, undef, undef, 0.1);
1247 kill $sig, $_; # srun wants two SIGINT to really interrupt
1249 $proc{$_}->{"sent_$sig"} = time;
1250 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1260 vec($bits,fileno($_),1) = 1;
1266 sub Log # ($jobstep_id, $logmessage)
1268 if ($_[1] =~ /\n/) {
1269 for my $line (split (/\n/, $_[1])) {
1274 my $fh = select STDERR; $|=1; select $fh;
1275 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1276 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1279 if ($local_logfile || -t STDERR) {
1280 my @gmtime = gmtime;
1281 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1282 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1284 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1286 if ($local_logfile) {
1287 print $local_logfile $datetime . " " . $message;
1294 my ($package, $file, $line) = caller;
1295 my $message = "@_ at $file line $line\n";
1296 Log (undef, $message);
1297 freeze() if @jobstep_todo;
1298 collate_output() if @jobstep_todo;
1300 save_meta() if $local_logfile;
1307 return if !$job_has_uuid;
1308 $Job->update_attributes('running' => 0,
1310 'finished_at' => scalar gmtime);
1316 my $justcheckpoint = shift; # false if this will be the last meta saved
1317 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1319 $local_logfile->flush;
1320 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1321 . quotemeta($local_logfile->filename);
1322 my $loglocator = `$cmd`;
1323 die "system $cmd failed: $?" if $?;
1326 $local_logfile = undef; # the temp file is automatically deleted
1327 Log (undef, "log manifest is $loglocator");
1328 $Job->{'log'} = $loglocator;
1329 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1333 sub freeze_if_want_freeze
1335 if ($main::please_freeze)
1337 release_allocation();
1340 # kill some srun procs before freeze+stop
1341 map { $proc{$_} = {} } @_;
1344 killem (keys %proc);
1345 select (undef, undef, undef, 0.1);
1347 while (($died = waitpid (-1, WNOHANG)) > 0)
1349 delete $proc{$died};
1364 Log (undef, "Freeze not implemented");
1371 croak ("Thaw not implemented");
1387 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1394 my $srunargs = shift;
1395 my $execargs = shift;
1396 my $opts = shift || {};
1398 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1399 print STDERR (join (" ",
1400 map { / / ? "'$_'" : $_ }
1403 if $ENV{CRUNCH_DEBUG};
1405 if (defined $stdin) {
1406 my $child = open STDIN, "-|";
1407 defined $child or die "no fork: $!";
1409 print $stdin or die $!;
1410 close STDOUT or die $!;
1415 return system (@$args) if $opts->{fork};
1418 warn "ENV size is ".length(join(" ",%ENV));
1419 die "exec failed: $!: @$args";
1423 sub ban_node_by_slot {
1424 # Don't start any new jobsteps on this node for 60 seconds
1426 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1427 $slot[$slotid]->{node}->{hold_count}++;
1428 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1433 my ($lockfile, $error_message) = @_;
1434 open L, ">", $lockfile or croak("$lockfile: $!");
1435 if (!flock L, LOCK_EX|LOCK_NB) {
1436 croak("Can't lock $lockfile: $error_message\n");
1443 # checkout-and-build
1447 my $destdir = $ENV{"CRUNCH_SRC"};
1448 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1449 my $repo = $ENV{"CRUNCH_SRC_URL"};
1451 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1453 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1457 unlink "$destdir.commit";
1458 open STDOUT, ">", "$destdir.log";
1459 open STDERR, ">&STDOUT";
1462 my @git_archive_data = <DATA>;
1463 if (@git_archive_data) {
1464 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1465 print TARX @git_archive_data;
1467 die "'tar -C $destdir -xf -' exited $?: $!";
1472 chomp ($pwd = `pwd`);
1473 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1476 for my $src_path ("$destdir/arvados/sdk/python") {
1478 shell_or_die ("virtualenv", $install_dir);
1479 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1483 if (-e "$destdir/crunch_scripts/install") {
1484 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1485 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1487 shell_or_die ("./tests/autotests.sh", $install_dir);
1488 } elsif (-e "./install.sh") {
1489 shell_or_die ("./install.sh", $install_dir);
1493 unlink "$destdir.commit.new";
1494 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1495 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1504 if ($ENV{"DEBUG"}) {
1505 print STDERR "@_\n";
1508 or die "@_ failed: $! exit 0x".sprintf("%x",$?);