2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Digest::MD5 qw(md5_hex);
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89 if ($ENV{"USER"} ne "crunch" && $< != 0) {
90 # use a tmp dir unique for my uid
91 $ENV{"CRUNCH_TMP"} .= "-$<";
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
101 if (defined $ENV{"ARV_CLI"}) {
102 $arv_cli = $ENV{"ARV_CLI"};
114 GetOptions('force-unlock' => \$force_unlock,
115 'git-dir=s' => \$git_dir,
116 'job=s' => \$jobspec,
117 'job-api-token=s' => \$job_api_token,
118 'no-clear-tmp' => \$no_clear_tmp,
119 'resume-stash=s' => \$resume_stash,
122 if (defined $job_api_token) {
123 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 if ($Job->{'is_locked_by_uuid'}) {
156 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
158 if ($Job->{'success'} ne undef) {
159 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
161 if ($Job->{'running'}) {
162 croak("Job 'running' flag is already set");
164 if ($Job->{'started_at'}) {
165 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
171 $Job = JSON::decode_json($jobspec);
175 map { croak ("No $_ specified") unless $Job->{$_} }
176 qw(script script_version script_parameters);
179 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180 $Job->{'started_at'} = gmtime;
182 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
186 $job_id = $Job->{'uuid'};
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
196 Log (undef, "check slurm allocation");
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
203 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204 push @sinfo, "$localcpus localhost";
206 if (exists $ENV{SLURM_NODELIST})
208 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
212 my ($ncpus, $slurm_nodelist) = split;
213 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
216 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
219 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
222 foreach (split (",", $ranges))
235 push @nodelist, map {
237 $n =~ s/\[[-,\d]+\]/$_/;
244 push @nodelist, $nodelist;
247 foreach my $nodename (@nodelist)
249 Log (undef, "node $nodename - $ncpus slots");
250 my $node = { name => $nodename,
254 foreach my $cpu (1..$ncpus)
256 push @slot, { node => $node,
260 push @node, @nodelist;
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
275 # Claim this job, and make sure nobody else does
276 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278 croak("Error while updating / locking job");
280 $Job->update_attributes('started_at' => scalar gmtime,
283 'tasks_summary' => { 'failed' => 0,
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
322 if (defined $Job->{thawedfromkey})
324 thaw ($Job->{thawedfromkey});
328 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329 'job_uuid' => $Job->{'uuid'},
334 push @jobstep, { 'level' => 0,
336 'arvados_task' => $first_task,
338 push @jobstep_todo, 0;
344 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
356 if (!defined $no_clear_tmp) {
357 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358 system($clear_tmp_cmd) == 0
359 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
361 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
364 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
368 or croak ("setup.py in $src_path failed: exit ".($?>>8));
376 $build_script = <DATA>;
378 Log (undef, "Install revision ".$Job->{script_version});
379 my $nodelist = join(",", @node);
381 if (!defined $no_clear_tmp) {
382 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
384 my $cleanpid = fork();
387 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
393 last if $cleanpid == waitpid (-1, WNOHANG);
394 freeze_if_want_freeze ($cleanpid);
395 select (undef, undef, undef, 0.1);
397 Log (undef, "Clean-work-dir exited $?");
400 # Install requested code version
403 my @srunargs = ("srun",
404 "--nodelist=$nodelist",
405 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
407 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
412 my $treeish = $Job->{'script_version'};
414 # If we're running under crunch-dispatch, it will have pulled the
415 # appropriate source tree into its own repository, and given us that
416 # repo's path as $git_dir. If we're running a "local" job, and a
417 # script_version was specified, it's up to the user to provide the
418 # full path to a local repository in Job->{repository}.
420 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421 # git-archive --remote where appropriate.
423 # TODO: Accept a locally-hosted Arvados repository by name or
424 # UUID. Use arvados.v1.repositories.list or .get to figure out the
425 # appropriate fetch-url.
426 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
428 $ENV{"CRUNCH_SRC_URL"} = $repo;
430 if (-d "$repo/.git") {
431 # We were given a working directory, but we are only interested in
433 $repo = "$repo/.git";
436 # If this looks like a subversion r#, look for it in git-svn commit messages
438 if ($treeish =~ m{^\d{1,4}$}) {
439 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
441 if ($gitlog =~ /^[a-f0-9]{40}$/) {
443 Log (undef, "Using commit $commit for script_version $treeish");
447 # If that didn't work, try asking git to look it up as a tree-ish.
449 if (!defined $commit) {
450 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
452 if ($found =~ /^[0-9a-f]{40}$/s) {
454 if ($commit ne $treeish) {
455 # Make sure we record the real commit id in the database,
456 # frozentokey, logs, etc. -- instead of an abbreviation or a
457 # branch name which can become ambiguous or point to a
458 # different commit in the future.
459 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460 Log (undef, "Using commit $commit for tree-ish $treeish");
461 if ($commit ne $treeish) {
462 $Job->{'script_version'} = $commit;
464 $Job->update_attributes('script_version' => $commit) or
465 croak("Error while updating job");
471 if (defined $commit) {
472 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473 @execargs = ("sh", "-c",
474 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
478 croak ("could not figure out commit id for $treeish");
481 my $installpid = fork();
482 if ($installpid == 0)
484 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
489 last if $installpid == waitpid (-1, WNOHANG);
490 freeze_if_want_freeze ($installpid);
491 select (undef, undef, undef, 0.1);
493 Log (undef, "Install exited $?");
498 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506 $docker_hash = find_docker_hash($docker_locator);
509 croak("No Docker image hash found from locator $docker_locator");
511 my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513 arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
516 my $docker_pid = fork();
517 if ($docker_pid == 0)
519 srun (["srun", "--nodelist=" . join(',', @node)],
520 ["/bin/sh", "-ec", $docker_install_script]);
525 last if $docker_pid == waitpid (-1, WNOHANG);
526 freeze_if_want_freeze ($docker_pid);
527 select (undef, undef, undef, 0.1);
531 croak("Installing Docker image from $docker_locator returned exit code $?");
535 foreach (qw (script script_version script_parameters runtime_constraints))
539 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
541 foreach (split (/\n/, $Job->{knobs}))
543 Log (undef, "knob " . $_);
548 $main::success = undef;
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559 or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
566 my @freeslot = (0..$#slot);
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
572 update_progress_stats();
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
579 my $id = $jobstep_todo[$todo_ptr];
580 my $Jobstep = $jobstep[$id];
581 if ($Jobstep->{level} != $level)
586 pipe $reader{$id}, "writer" or croak ($!);
587 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
590 my $childslot = $freeslot[0];
591 my $childnode = $slot[$childslot]->{node};
592 my $childslotname = join (".",
593 $slot[$childslot]->{node}->{name},
594 $slot[$childslot]->{cpu});
595 my $childpid = fork();
598 $SIG{'INT'} = 'DEFAULT';
599 $SIG{'QUIT'} = 'DEFAULT';
600 $SIG{'TERM'} = 'DEFAULT';
602 foreach (values (%reader))
606 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607 open(STDOUT,">&writer");
608 open(STDERR,">&writer");
613 delete $ENV{"GNUPGHOME"};
614 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615 $ENV{"TASK_QSEQUENCE"} = $id;
616 $ENV{"TASK_SEQUENCE"} = $level;
617 $ENV{"JOB_SCRIPT"} = $Job->{script};
618 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619 $param =~ tr/a-z/A-Z/;
620 $ENV{"JOB_PARAMETER_$param"} = $value;
622 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
626 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
627 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
628 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
634 "--nodelist=".$childnode->{name},
635 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
636 "--job-name=$job_id.$id.$$",
638 my $build_script_to_send = "";
640 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
641 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
642 ."&& cd $ENV{CRUNCH_TMP} ";
645 $build_script_to_send = $build_script;
649 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
652 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
653 $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
654 # Dynamically configure the container to use the host system as its
655 # DNS server. Get the host's global addresses from the ip command,
656 # and turn them into docker --dns options using gawk.
658 q{$(ip -o address show scope global |
659 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
660 foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
662 $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
664 while (my ($env_key, $env_val) = each %ENV)
666 if ($env_key =~ /^(JOB|TASK)_/) {
667 $command .= "-e \Q$env_key=$env_val\E ";
670 $command .= "\Q$docker_hash\E ";
672 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
674 $command .= "stdbuf -o0 -e0 ";
675 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
676 my @execargs = ('bash', '-c', $command);
677 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
681 if (!defined $childpid)
688 $proc{$childpid} = { jobstep => $id,
691 jobstepname => "$job_id.$id.$childpid",
693 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
694 $slot[$childslot]->{pid} = $childpid;
696 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
697 Log ($id, "child $childpid started on $childslotname");
698 $Jobstep->{starttime} = time;
699 $Jobstep->{node} = $childnode->{name};
700 $Jobstep->{slotindex} = $childslot;
701 delete $Jobstep->{stderr};
702 delete $Jobstep->{finishtime};
704 splice @jobstep_todo, $todo_ptr, 1;
707 $progress_is_dirty = 1;
711 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
713 last THISROUND if $main::please_freeze;
714 if ($main::please_info)
716 $main::please_info = 0;
720 update_progress_stats();
727 check_refresh_wanted();
729 update_progress_stats();
730 select (undef, undef, undef, 0.1);
732 elsif (time - $progress_stats_updated >= 30)
734 update_progress_stats();
736 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
737 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
739 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
740 .($thisround_failed+$thisround_succeeded)
741 .") -- giving up on this round";
742 Log (undef, $message);
746 # move slots from freeslot to holdslot (or back to freeslot) if necessary
747 for (my $i=$#freeslot; $i>=0; $i--) {
748 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
749 push @holdslot, (splice @freeslot, $i, 1);
752 for (my $i=$#holdslot; $i>=0; $i--) {
753 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
754 push @freeslot, (splice @holdslot, $i, 1);
758 # give up if no nodes are succeeding
759 if (!grep { $_->{node}->{losing_streak} == 0 &&
760 $_->{node}->{hold_count} < 4 } @slot) {
761 my $message = "Every node has failed -- giving up on this round";
762 Log (undef, $message);
769 push @freeslot, splice @holdslot;
770 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
773 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
776 if ($main::please_continue) {
777 $main::please_continue = 0;
780 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
784 check_refresh_wanted();
786 update_progress_stats();
787 select (undef, undef, undef, 0.1);
788 killem (keys %proc) if $main::please_freeze;
792 update_progress_stats();
793 freeze_if_want_freeze();
796 if (!defined $main::success)
799 $thisround_succeeded == 0 &&
800 ($thisround_failed == 0 || $thisround_failed > 4))
802 my $message = "stop because $thisround_failed tasks failed and none succeeded";
803 Log (undef, $message);
812 goto ONELEVEL if !defined $main::success;
815 release_allocation();
817 my $collated_output = &collate_output();
820 $Job->update_attributes('running' => 0,
821 'success' => $collated_output && $main::success,
822 'finished_at' => scalar gmtime)
825 if ($collated_output)
828 open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
829 or die "failed to get collated manifest: $!";
830 # Read the original manifest, and strip permission hints from it,
831 # so we can put the result in a Collection.
832 my @stripped_manifest_lines = ();
833 my $orig_manifest_text = '';
834 while (my $manifest_line = <$orig_manifest>) {
835 $orig_manifest_text .= $manifest_line;
836 my @words = split(/ /, $manifest_line, -1);
837 foreach my $ii (0..$#words) {
838 if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
839 $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
842 push(@stripped_manifest_lines, join(" ", @words));
844 my $stripped_manifest_text = join("", @stripped_manifest_lines);
845 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
846 'uuid' => md5_hex($stripped_manifest_text),
847 'manifest_text' => $orig_manifest_text,
849 $Job->update_attributes('output' => $output->{uuid});
850 if ($Job->{'output_is_persistent'}) {
851 $arv->{'links'}->{'create'}->execute('link' => {
852 'tail_kind' => 'arvados#user',
853 'tail_uuid' => $User->{'uuid'},
854 'head_kind' => 'arvados#collection',
855 'head_uuid' => $Job->{'output'},
856 'link_class' => 'resources',
862 Log (undef, "Failed to register output manifest: $@");
866 Log (undef, "finish");
873 sub update_progress_stats
875 $progress_stats_updated = time;
876 return if !$progress_is_dirty;
877 my ($todo, $done, $running) = (scalar @jobstep_todo,
878 scalar @jobstep_done,
879 scalar @slot - scalar @freeslot - scalar @holdslot);
880 $Job->{'tasks_summary'} ||= {};
881 $Job->{'tasks_summary'}->{'todo'} = $todo;
882 $Job->{'tasks_summary'}->{'done'} = $done;
883 $Job->{'tasks_summary'}->{'running'} = $running;
885 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
887 Log (undef, "status: $done done, $running running, $todo todo");
888 $progress_is_dirty = 0;
895 my $pid = waitpid (-1, WNOHANG);
896 return 0 if $pid <= 0;
898 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
900 . $slot[$proc{$pid}->{slot}]->{cpu});
901 my $jobstepid = $proc{$pid}->{jobstep};
902 my $elapsed = time - $proc{$pid}->{time};
903 my $Jobstep = $jobstep[$jobstepid];
905 my $childstatus = $?;
906 my $exitvalue = $childstatus >> 8;
907 my $exitinfo = sprintf("exit %d signal %d%s",
910 ($childstatus & 128 ? ' core dump' : ''));
911 $Jobstep->{'arvados_task'}->reload;
912 my $task_success = $Jobstep->{'arvados_task'}->{success};
914 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
916 if (!defined $task_success) {
917 # task did not indicate one way or the other --> fail
918 $Jobstep->{'arvados_task'}->{success} = 0;
919 $Jobstep->{'arvados_task'}->save;
926 $temporary_fail ||= $Jobstep->{node_fail};
927 $temporary_fail ||= ($exitvalue == 111);
930 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
932 # Check for signs of a failed or misconfigured node
933 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
934 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
935 # Don't count this against jobstep failure thresholds if this
936 # node is already suspected faulty and srun exited quickly
937 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
939 Log ($jobstepid, "blaming failure on suspect node " .
940 $slot[$proc{$pid}->{slot}]->{node}->{name});
941 $temporary_fail ||= 1;
943 ban_node_by_slot($proc{$pid}->{slot});
946 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
947 ++$Jobstep->{'failures'},
948 $temporary_fail ? 'temporary ' : 'permanent',
951 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
952 # Give up on this task, and the whole job
954 $main::please_freeze = 1;
957 # Put this task back on the todo queue
958 push @jobstep_todo, $jobstepid;
960 $Job->{'tasks_summary'}->{'failed'}++;
964 ++$thisround_succeeded;
965 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
966 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
967 push @jobstep_done, $jobstepid;
968 Log ($jobstepid, "success in $elapsed seconds");
970 $Jobstep->{exitcode} = $childstatus;
971 $Jobstep->{finishtime} = time;
972 process_stderr ($jobstepid, $task_success);
973 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
975 close $reader{$jobstepid};
976 delete $reader{$jobstepid};
977 delete $slot[$proc{$pid}->{slot}]->{pid};
978 push @freeslot, $proc{$pid}->{slot};
982 my $newtask_list = [];
985 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
987 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
989 'order' => 'qsequence',
990 'offset' => scalar(@$newtask_list),
992 push(@$newtask_list, @{$newtask_results->{items}});
993 } while (@{$newtask_results->{items}});
994 foreach my $arvados_task (@$newtask_list) {
996 'level' => $arvados_task->{'sequence'},
998 'arvados_task' => $arvados_task
1000 push @jobstep, $jobstep;
1001 push @jobstep_todo, $#jobstep;
1004 $progress_is_dirty = 1;
1008 sub check_refresh_wanted
1010 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1011 if (@stat && $stat[9] > $latest_refresh) {
1012 $latest_refresh = scalar time;
1013 if ($job_has_uuid) {
1014 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1015 for my $attr ('cancelled_at',
1016 'cancelled_by_user_uuid',
1017 'cancelled_by_client_uuid') {
1018 $Job->{$attr} = $Job2->{$attr};
1020 if ($Job->{'cancelled_at'}) {
1021 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1022 " by user " . $Job->{cancelled_by_user_uuid});
1024 $main::please_freeze = 1;
1032 # return if the kill list was checked <4 seconds ago
1033 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1037 $squeue_kill_checked = time;
1039 # use killem() on procs whose killtime is reached
1042 if (exists $proc{$_}->{killtime}
1043 && $proc{$_}->{killtime} <= time)
1049 # return if the squeue was checked <60 seconds ago
1050 if (defined $squeue_checked && $squeue_checked > time - 60)
1054 $squeue_checked = time;
1058 # here is an opportunity to check for mysterious problems with local procs
1062 # get a list of steps still running
1063 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1065 if ($squeue[-1] ne "ok")
1071 # which of my jobsteps are running, according to squeue?
1075 if (/^(\d+)\.(\d+) (\S+)/)
1077 if ($1 eq $ENV{SLURM_JOBID})
1084 # which of my active child procs (>60s old) were not mentioned by squeue?
1085 foreach (keys %proc)
1087 if ($proc{$_}->{time} < time - 60
1088 && !exists $ok{$proc{$_}->{jobstepname}}
1089 && !exists $proc{$_}->{killtime})
1091 # kill this proc if it hasn't exited in 30 seconds
1092 $proc{$_}->{killtime} = time + 30;
1098 sub release_allocation
1102 Log (undef, "release job allocation");
1103 system "scancel $ENV{SLURM_JOBID}";
1111 foreach my $job (keys %reader)
1114 while (0 < sysread ($reader{$job}, $buf, 8192))
1116 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1117 $jobstep[$job]->{stderr} .= $buf;
1118 preprocess_stderr ($job);
1119 if (length ($jobstep[$job]->{stderr}) > 16384)
1121 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1130 sub preprocess_stderr
1134 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1136 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1137 Log ($job, "stderr $line");
1138 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1140 $main::please_freeze = 1;
1142 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1143 $jobstep[$job]->{node_fail} = 1;
1144 ban_node_by_slot($jobstep[$job]->{slotindex});
1153 my $task_success = shift;
1154 preprocess_stderr ($job);
1157 Log ($job, "stderr $_");
1158 } split ("\n", $jobstep[$job]->{stderr});
1164 my ($keep, $child_out, $output_block);
1166 my $cmd = "$arv_cli keep get \Q$hash\E";
1167 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1168 sysread($keep, $output_block, 64 * 1024 * 1024);
1170 return $output_block;
1175 Log (undef, "collate");
1177 my ($child_out, $child_in);
1178 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1182 next if (!exists $_->{'arvados_task'}->{output} ||
1183 !$_->{'arvados_task'}->{'success'} ||
1184 $_->{'exitcode'} != 0);
1185 my $output = $_->{'arvados_task'}->{output};
1186 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1188 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1189 print $child_in $output;
1191 elsif (@jobstep == 1)
1193 $joboutput = $output;
1196 elsif (defined (my $outblock = fetch_block ($output)))
1198 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1199 print $child_in $outblock;
1203 Log (undef, "XXX fetch_block($output) failed XXX");
1209 if (!defined $joboutput) {
1210 my $s = IO::Select->new($child_out);
1211 if ($s->can_read(120)) {
1212 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1215 Log (undef, "timed out reading from 'arv keep put'");
1222 Log (undef, "output $joboutput");
1223 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1227 Log (undef, "output undef");
1237 my $sig = 2; # SIGINT first
1238 if (exists $proc{$_}->{"sent_$sig"} &&
1239 time - $proc{$_}->{"sent_$sig"} > 4)
1241 $sig = 15; # SIGTERM if SIGINT doesn't work
1243 if (exists $proc{$_}->{"sent_$sig"} &&
1244 time - $proc{$_}->{"sent_$sig"} > 4)
1246 $sig = 9; # SIGKILL if SIGTERM doesn't work
1248 if (!exists $proc{$_}->{"sent_$sig"})
1250 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1252 select (undef, undef, undef, 0.1);
1255 kill $sig, $_; # srun wants two SIGINT to really interrupt
1257 $proc{$_}->{"sent_$sig"} = time;
1258 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1268 vec($bits,fileno($_),1) = 1;
1274 sub Log # ($jobstep_id, $logmessage)
1276 if ($_[1] =~ /\n/) {
1277 for my $line (split (/\n/, $_[1])) {
1282 my $fh = select STDERR; $|=1; select $fh;
1283 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1284 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1287 if ($local_logfile || -t STDERR) {
1288 my @gmtime = gmtime;
1289 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1290 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1292 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1294 if ($local_logfile) {
1295 print $local_logfile $datetime . " " . $message;
1302 my ($package, $file, $line) = caller;
1303 my $message = "@_ at $file line $line\n";
1304 Log (undef, $message);
1305 freeze() if @jobstep_todo;
1306 collate_output() if @jobstep_todo;
1308 save_meta() if $local_logfile;
1315 return if !$job_has_uuid;
1316 $Job->update_attributes('running' => 0,
1318 'finished_at' => scalar gmtime);
1324 my $justcheckpoint = shift; # false if this will be the last meta saved
1325 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1327 $local_logfile->flush;
1328 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1329 . quotemeta($local_logfile->filename);
1330 my $loglocator = `$cmd`;
1331 die "system $cmd failed: $?" if $?;
1334 $local_logfile = undef; # the temp file is automatically deleted
1335 Log (undef, "log manifest is $loglocator");
1336 $Job->{'log'} = $loglocator;
1337 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1341 sub freeze_if_want_freeze
1343 if ($main::please_freeze)
1345 release_allocation();
1348 # kill some srun procs before freeze+stop
1349 map { $proc{$_} = {} } @_;
1352 killem (keys %proc);
1353 select (undef, undef, undef, 0.1);
1355 while (($died = waitpid (-1, WNOHANG)) > 0)
1357 delete $proc{$died};
1372 Log (undef, "Freeze not implemented");
1379 croak ("Thaw not implemented");
1395 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1402 my $srunargs = shift;
1403 my $execargs = shift;
1404 my $opts = shift || {};
1406 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1407 print STDERR (join (" ",
1408 map { / / ? "'$_'" : $_ }
1411 if $ENV{CRUNCH_DEBUG};
1413 if (defined $stdin) {
1414 my $child = open STDIN, "-|";
1415 defined $child or die "no fork: $!";
1417 print $stdin or die $!;
1418 close STDOUT or die $!;
1423 return system (@$args) if $opts->{fork};
1426 warn "ENV size is ".length(join(" ",%ENV));
1427 die "exec failed: $!: @$args";
1431 sub ban_node_by_slot {
1432 # Don't start any new jobsteps on this node for 60 seconds
1434 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1435 $slot[$slotid]->{node}->{hold_count}++;
1436 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1441 my ($lockfile, $error_message) = @_;
1442 open L, ">", $lockfile or croak("$lockfile: $!");
1443 if (!flock L, LOCK_EX|LOCK_NB) {
1444 croak("Can't lock $lockfile: $error_message\n");
1448 sub find_docker_hash {
1449 # Given a Keep locator, search for a matching link to find the Docker hash
1450 # of the stored image.
1451 my $locator = shift;
1452 my $links_result = $arv->{links}->{list}->execute(
1453 filters => [["head_uuid", "=", $locator],
1454 ["link_class", "=", "docker_image_hash"]],
1457 foreach my $link (@{$links_result->{items}}) {
1458 $docker_hash = lc($link->{name});
1460 return $docker_hash;
1466 # checkout-and-build
1470 my $destdir = $ENV{"CRUNCH_SRC"};
1471 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1472 my $repo = $ENV{"CRUNCH_SRC_URL"};
1474 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1476 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1480 unlink "$destdir.commit";
1481 open STDOUT, ">", "$destdir.log";
1482 open STDERR, ">&STDOUT";
1485 my @git_archive_data = <DATA>;
1486 if (@git_archive_data) {
1487 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1488 print TARX @git_archive_data;
1490 die "'tar -C $destdir -xf -' exited $?: $!";
1495 chomp ($pwd = `pwd`);
1496 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1499 for my $src_path ("$destdir/arvados/sdk/python") {
1501 shell_or_die ("virtualenv", $install_dir);
1502 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1506 if (-e "$destdir/crunch_scripts/install") {
1507 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1508 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1510 shell_or_die ("./tests/autotests.sh", $install_dir);
1511 } elsif (-e "./install.sh") {
1512 shell_or_die ("./install.sh", $install_dir);
1516 unlink "$destdir.commit.new";
1517 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1518 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1527 if ($ENV{"DEBUG"}) {
1528 print STDERR "@_\n";
1531 or die "@_ failed: $! exit 0x".sprintf("%x",$?);