2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path remove_tree );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $User = $arv->{'users'}->{'current'}->execute;
150 if ($jobspec =~ /^[-a-z\d]+$/)
152 # $jobspec is an Arvados UUID, not a JSON job specification
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # Claim this job, and make sure nobody else does
157 # lock() sets is_locked_by_uuid and changes state to Running.
158 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
161 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
168 $Job = JSON::decode_json($jobspec);
172 map { croak ("No $_ specified") unless $Job->{$_} }
173 qw(script script_version script_parameters);
176 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177 $Job->{'started_at'} = gmtime;
178 $Job->{'state'} = 'Running';
180 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
182 $job_id = $Job->{'uuid'};
184 my $keep_logfile = $job_id . '.log.txt';
185 log_writer_start($keep_logfile);
187 $Job->{'runtime_constraints'} ||= {};
188 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
189 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
192 Log (undef, "check slurm allocation");
195 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
199 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
200 push @sinfo, "$localcpus localhost";
202 if (exists $ENV{SLURM_NODELIST})
204 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
208 my ($ncpus, $slurm_nodelist) = split;
209 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
212 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
215 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
218 foreach (split (",", $ranges))
231 push @nodelist, map {
233 $n =~ s/\[[-,\d]+\]/$_/;
240 push @nodelist, $nodelist;
243 foreach my $nodename (@nodelist)
245 Log (undef, "node $nodename - $ncpus slots");
246 my $node = { name => $nodename,
250 foreach my $cpu (1..$ncpus)
252 push @slot, { node => $node,
256 push @node, @nodelist;
261 # Ensure that we get one jobstep running on each allocated node before
262 # we start overloading nodes with concurrent steps
264 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267 $Job->update_attributes(
268 'tasks_summary' => { 'failed' => 0,
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
305 if (defined $Job->{thawedfromkey})
307 thaw ($Job->{thawedfromkey});
311 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312 'job_uuid' => $Job->{'uuid'},
317 push @jobstep, { 'level' => 0,
319 'arvados_task' => $first_task,
321 push @jobstep_todo, 0;
327 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
334 $build_script = <DATA>;
336 my $nodelist = join(",", @node);
338 if (!defined $no_clear_tmp) {
339 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
340 Log (undef, "Clean work dirs");
342 my $cleanpid = fork();
345 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
346 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
351 last if $cleanpid == waitpid (-1, WNOHANG);
352 freeze_if_want_freeze ($cleanpid);
353 select (undef, undef, undef, 0.1);
355 Log (undef, "Cleanup command exited ".exit_status_s($?));
360 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
361 # If we're in user-land (i.e., not called from crunch-dispatch)
362 # script_version can be an absolute directory path, signifying we
363 # should work straight out of that directory instead of using a git
365 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
366 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
369 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
371 # Install requested code version
372 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
374 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
376 # If we're running under crunch-dispatch, it will have already
377 # pulled the appropriate source tree into its own repository, and
378 # given us that repo's path as $git_dir.
380 # If we're running a "local" job, we might have to fetch content
381 # from a remote repository.
383 # (Currently crunch-dispatch gives a local path with --git-dir, but
384 # we might as well accept URLs there too in case it changes its
386 my $repo = $git_dir || $Job->{'repository'};
388 # Repository can be remote or local. If remote, we'll need to fetch it
389 # to a local dir before doing `git log` et al.
392 if ($repo =~ m{://|^[^/]*:}) {
393 # $repo is a git url we can clone, like git:// or https:// or
394 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
395 # not recognized here because distinguishing that from a local
396 # path is too fragile. If you really need something strange here,
397 # use the ssh:// form.
398 $repo_location = 'remote';
399 } elsif ($repo =~ m{^\.*/}) {
400 # $repo is a local path to a git index. We'll also resolve ../foo
401 # to ../foo/.git if the latter is a directory. To help
402 # disambiguate local paths from named hosted repositories, this
403 # form must be given as ./ or ../ if it's a relative path.
404 if (-d "$repo/.git") {
405 $repo = "$repo/.git";
407 $repo_location = 'local';
409 # $repo is none of the above. It must be the name of a hosted
411 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
412 'filters' => [['name','=',$repo]]
414 my $n_found = scalar @{$arv_repo_list};
416 Log(undef, "Repository '$repo' -> "
417 . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
420 croak("Error: Found $n_found repositories with name '$repo'.");
422 $repo = $arv_repo_list->[0]->{'fetch_url'};
423 $repo_location = 'remote';
425 Log(undef, "Using $repo_location repository '$repo'");
426 $ENV{"CRUNCH_SRC_URL"} = $repo;
428 # Resolve given script_version (we'll call that $treeish here) to a
429 # commit sha1 ($commit).
430 my $treeish = $Job->{'script_version'};
432 if ($repo_location eq 'remote') {
433 # We minimize excess object-fetching by re-using the same bare
434 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
435 # just keep adding remotes to it as needed.
436 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
437 my $gitcmd = "git --git-dir=\Q$local_repo\E";
439 # Set up our local repo for caching remote objects, making
441 if (!-d $local_repo) {
442 make_path($local_repo) or croak("Error: could not create $local_repo");
444 # This works (exits 0 and doesn't delete fetched objects) even
445 # if $local_repo is already initialized:
446 `$gitcmd init --bare`;
448 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
451 # If $treeish looks like a hash (or abbrev hash) we look it up in
452 # our local cache first, since that's cheaper. (We don't want to
453 # do that with tags/branches though -- those change over time, so
454 # they should always be resolved by the remote repo.)
455 if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
456 # Hide stderr because it's normal for this to fail:
457 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
459 $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
460 $sha1 =~ /^([0-9a-f]{40})$/s) {
462 Log(undef, "Commit $commit already present in $local_repo");
466 if (!defined $commit) {
467 # If $treeish isn't just a hash or abbrev hash, or isn't here
468 # yet, we need to fetch the remote to resolve it correctly.
470 # First, remove all local heads. This prevents a name that does
471 # not exist on the remote from resolving to (or colliding with)
472 # a previously fetched branch or tag (possibly from a different
474 remove_tree("$local_repo/refs/heads", {keep_root => 1});
476 Log(undef, "Fetching objects from $repo to $local_repo");
477 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
479 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
483 # Now that the data is all here, we will use our local repo for
484 # the rest of our git activities.
488 my $gitcmd = "git --git-dir=\Q$repo\E";
489 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
490 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
491 croak("`$gitcmd rev-list` exited "
493 .", '$treeish' not found. Giving up.");
496 Log(undef, "Version $treeish is commit $commit");
498 if ($commit ne $Job->{'script_version'}) {
499 # Record the real commit id in the database, frozentokey, logs,
500 # etc. -- instead of an abbreviation or a branch name which can
501 # become ambiguous or point to a different commit in the future.
502 if (!$Job->update_attributes('script_version' => $commit)) {
503 croak("Error: failed to update job's script_version attribute");
507 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
508 $git_archive = `$gitcmd archive ''\Q$commit\E`;
510 croak("Error: $gitcmd archive exited ".exit_status_s($?));
514 if (!defined $git_archive) {
515 Log(undef, "Skip install phase (no git archive)");
517 Log(undef, "Warning: This probably means workers have no source tree!");
521 Log(undef, "Run install script on all workers");
523 my @srunargs = ("srun",
524 "--nodelist=$nodelist",
525 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
526 my @execargs = ("sh", "-c",
527 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
529 # Note: this section is almost certainly unnecessary if we're
530 # running tasks in docker containers.
531 my $installpid = fork();
532 if ($installpid == 0)
534 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
539 last if $installpid == waitpid (-1, WNOHANG);
540 freeze_if_want_freeze ($installpid);
541 select (undef, undef, undef, 0.1);
543 Log (undef, "Install script exited ".exit_status_s($?));
548 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
549 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
552 # If this job requires a Docker image, install that.
553 my $docker_bin = "/usr/bin/docker.io";
554 my ($docker_locator, $docker_stream, $docker_hash);
555 if ($docker_locator = $Job->{docker_image_locator}) {
556 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
559 croak("No Docker image hash found from locator $docker_locator");
561 $docker_stream =~ s/^\.//;
562 my $docker_install_script = qq{
563 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
564 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
567 my $docker_pid = fork();
568 if ($docker_pid == 0)
570 srun (["srun", "--nodelist=" . join(',', @node)],
571 ["/bin/sh", "-ec", $docker_install_script]);
576 last if $docker_pid == waitpid (-1, WNOHANG);
577 freeze_if_want_freeze ($docker_pid);
578 select (undef, undef, undef, 0.1);
582 croak("Installing Docker image from $docker_locator exited "
587 foreach (qw (script script_version script_parameters runtime_constraints))
591 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
593 foreach (split (/\n/, $Job->{knobs}))
595 Log (undef, "knob " . $_);
600 $main::success = undef;
606 my $thisround_succeeded = 0;
607 my $thisround_failed = 0;
608 my $thisround_failed_multiple = 0;
610 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
611 or $a <=> $b } @jobstep_todo;
612 my $level = $jobstep[$jobstep_todo[0]]->{level};
613 Log (undef, "start level $level");
618 my @freeslot = (0..$#slot);
621 my $progress_is_dirty = 1;
622 my $progress_stats_updated = 0;
624 update_progress_stats();
629 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
631 my $id = $jobstep_todo[$todo_ptr];
632 my $Jobstep = $jobstep[$id];
633 if ($Jobstep->{level} != $level)
638 pipe $reader{$id}, "writer" or croak ($!);
639 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
640 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
642 my $childslot = $freeslot[0];
643 my $childnode = $slot[$childslot]->{node};
644 my $childslotname = join (".",
645 $slot[$childslot]->{node}->{name},
646 $slot[$childslot]->{cpu});
647 my $childpid = fork();
650 $SIG{'INT'} = 'DEFAULT';
651 $SIG{'QUIT'} = 'DEFAULT';
652 $SIG{'TERM'} = 'DEFAULT';
654 foreach (values (%reader))
658 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
659 open(STDOUT,">&writer");
660 open(STDERR,">&writer");
665 delete $ENV{"GNUPGHOME"};
666 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
667 $ENV{"TASK_QSEQUENCE"} = $id;
668 $ENV{"TASK_SEQUENCE"} = $level;
669 $ENV{"JOB_SCRIPT"} = $Job->{script};
670 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
671 $param =~ tr/a-z/A-Z/;
672 $ENV{"JOB_PARAMETER_$param"} = $value;
674 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
675 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
676 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
677 $ENV{"HOME"} = $ENV{"TASK_WORK"};
678 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
679 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
680 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
681 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
687 "--nodelist=".$childnode->{name},
688 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
689 "--job-name=$job_id.$id.$$",
691 my $build_script_to_send = "";
693 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
694 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
695 ."&& cd $ENV{CRUNCH_TMP} ";
698 $build_script_to_send = $build_script;
702 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
705 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
706 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
707 # Dynamically configure the container to use the host system as its
708 # DNS server. Get the host's global addresses from the ip command,
709 # and turn them into docker --dns options using gawk.
711 q{$(ip -o address show scope global |
712 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
713 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
714 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
715 $command .= "--env=\QHOME=/home/crunch\E ";
716 while (my ($env_key, $env_val) = each %ENV)
718 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
719 if ($env_key eq "TASK_WORK") {
720 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
722 elsif ($env_key eq "TASK_KEEPMOUNT") {
723 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
726 $command .= "--env=\Q$env_key=$env_val\E ";
730 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
731 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
732 $command .= "\Q$docker_hash\E ";
733 $command .= "stdbuf --output=0 --error=0 ";
734 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
737 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
738 $command .= "stdbuf --output=0 --error=0 ";
739 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
742 my @execargs = ('bash', '-c', $command);
743 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
744 # exec() failed, we assume nothing happened.
745 Log(undef, "srun() failed on build script");
749 if (!defined $childpid)
756 $proc{$childpid} = { jobstep => $id,
759 jobstepname => "$job_id.$id.$childpid",
761 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
762 $slot[$childslot]->{pid} = $childpid;
764 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
765 Log ($id, "child $childpid started on $childslotname");
766 $Jobstep->{starttime} = time;
767 $Jobstep->{node} = $childnode->{name};
768 $Jobstep->{slotindex} = $childslot;
769 delete $Jobstep->{stderr};
770 delete $Jobstep->{finishtime};
772 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
773 $Jobstep->{'arvados_task'}->save;
775 splice @jobstep_todo, $todo_ptr, 1;
778 $progress_is_dirty = 1;
782 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
784 last THISROUND if $main::please_freeze;
785 if ($main::please_info)
787 $main::please_info = 0;
791 update_progress_stats();
798 check_refresh_wanted();
800 update_progress_stats();
801 select (undef, undef, undef, 0.1);
803 elsif (time - $progress_stats_updated >= 30)
805 update_progress_stats();
807 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
808 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
810 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
811 .($thisround_failed+$thisround_succeeded)
812 .") -- giving up on this round";
813 Log (undef, $message);
817 # move slots from freeslot to holdslot (or back to freeslot) if necessary
818 for (my $i=$#freeslot; $i>=0; $i--) {
819 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
820 push @holdslot, (splice @freeslot, $i, 1);
823 for (my $i=$#holdslot; $i>=0; $i--) {
824 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
825 push @freeslot, (splice @holdslot, $i, 1);
829 # give up if no nodes are succeeding
830 if (!grep { $_->{node}->{losing_streak} == 0 &&
831 $_->{node}->{hold_count} < 4 } @slot) {
832 my $message = "Every node has failed -- giving up on this round";
833 Log (undef, $message);
840 push @freeslot, splice @holdslot;
841 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
844 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
847 if ($main::please_continue) {
848 $main::please_continue = 0;
851 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
855 check_refresh_wanted();
857 update_progress_stats();
858 select (undef, undef, undef, 0.1);
859 killem (keys %proc) if $main::please_freeze;
863 update_progress_stats();
864 freeze_if_want_freeze();
867 if (!defined $main::success)
870 $thisround_succeeded == 0 &&
871 ($thisround_failed == 0 || $thisround_failed > 4))
873 my $message = "stop because $thisround_failed tasks failed and none succeeded";
874 Log (undef, $message);
883 goto ONELEVEL if !defined $main::success;
886 release_allocation();
888 my $collated_output = &collate_output();
890 if (!$collated_output) {
891 Log(undef, "output undef");
895 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
896 or die "failed to get collated manifest: $!";
897 my $orig_manifest_text = '';
898 while (my $manifest_line = <$orig_manifest>) {
899 $orig_manifest_text .= $manifest_line;
901 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
902 'manifest_text' => $orig_manifest_text,
904 Log(undef, "output uuid " . $output->{uuid});
905 Log(undef, "output hash " . $output->{portable_data_hash});
906 $Job->update_attributes('output' => $output->{portable_data_hash});
909 Log (undef, "Failed to register output manifest: $@");
913 Log (undef, "finish");
918 if ($collated_output && $main::success) {
919 $final_state = 'Complete';
921 $final_state = 'Failed';
923 $Job->update_attributes('state' => $final_state);
925 exit (($final_state eq 'Complete') ? 0 : 1);
929 sub update_progress_stats
931 $progress_stats_updated = time;
932 return if !$progress_is_dirty;
933 my ($todo, $done, $running) = (scalar @jobstep_todo,
934 scalar @jobstep_done,
935 scalar @slot - scalar @freeslot - scalar @holdslot);
936 $Job->{'tasks_summary'} ||= {};
937 $Job->{'tasks_summary'}->{'todo'} = $todo;
938 $Job->{'tasks_summary'}->{'done'} = $done;
939 $Job->{'tasks_summary'}->{'running'} = $running;
940 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
941 Log (undef, "status: $done done, $running running, $todo todo");
942 $progress_is_dirty = 0;
949 my $pid = waitpid (-1, WNOHANG);
950 return 0 if $pid <= 0;
952 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
954 . $slot[$proc{$pid}->{slot}]->{cpu});
955 my $jobstepid = $proc{$pid}->{jobstep};
956 my $elapsed = time - $proc{$pid}->{time};
957 my $Jobstep = $jobstep[$jobstepid];
959 my $childstatus = $?;
960 my $exitvalue = $childstatus >> 8;
961 my $exitinfo = "exit ".exit_status_s($childstatus);
962 $Jobstep->{'arvados_task'}->reload;
963 my $task_success = $Jobstep->{'arvados_task'}->{success};
965 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
967 if (!defined $task_success) {
968 # task did not indicate one way or the other --> fail
969 $Jobstep->{'arvados_task'}->{success} = 0;
970 $Jobstep->{'arvados_task'}->save;
977 $temporary_fail ||= $Jobstep->{node_fail};
978 $temporary_fail ||= ($exitvalue == 111);
981 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
983 # Check for signs of a failed or misconfigured node
984 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
985 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
986 # Don't count this against jobstep failure thresholds if this
987 # node is already suspected faulty and srun exited quickly
988 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
990 Log ($jobstepid, "blaming failure on suspect node " .
991 $slot[$proc{$pid}->{slot}]->{node}->{name});
992 $temporary_fail ||= 1;
994 ban_node_by_slot($proc{$pid}->{slot});
997 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
998 ++$Jobstep->{'failures'},
999 $temporary_fail ? 'temporary ' : 'permanent',
1002 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1003 # Give up on this task, and the whole job
1005 $main::please_freeze = 1;
1007 # Put this task back on the todo queue
1008 push @jobstep_todo, $jobstepid;
1009 $Job->{'tasks_summary'}->{'failed'}++;
1013 ++$thisround_succeeded;
1014 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1015 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1016 push @jobstep_done, $jobstepid;
1017 Log ($jobstepid, "success in $elapsed seconds");
1019 $Jobstep->{exitcode} = $childstatus;
1020 $Jobstep->{finishtime} = time;
1021 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1022 $Jobstep->{'arvados_task'}->save;
1023 process_stderr ($jobstepid, $task_success);
1024 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1026 close $reader{$jobstepid};
1027 delete $reader{$jobstepid};
1028 delete $slot[$proc{$pid}->{slot}]->{pid};
1029 push @freeslot, $proc{$pid}->{slot};
1032 if ($task_success) {
1034 my $newtask_list = [];
1035 my $newtask_results;
1037 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1039 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1041 'order' => 'qsequence',
1042 'offset' => scalar(@$newtask_list),
1044 push(@$newtask_list, @{$newtask_results->{items}});
1045 } while (@{$newtask_results->{items}});
1046 foreach my $arvados_task (@$newtask_list) {
1048 'level' => $arvados_task->{'sequence'},
1050 'arvados_task' => $arvados_task
1052 push @jobstep, $jobstep;
1053 push @jobstep_todo, $#jobstep;
1057 $progress_is_dirty = 1;
1061 sub check_refresh_wanted
1063 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1064 if (@stat && $stat[9] > $latest_refresh) {
1065 $latest_refresh = scalar time;
1066 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1067 for my $attr ('cancelled_at',
1068 'cancelled_by_user_uuid',
1069 'cancelled_by_client_uuid',
1071 $Job->{$attr} = $Job2->{$attr};
1073 if ($Job->{'state'} ne "Running") {
1074 if ($Job->{'state'} eq "Cancelled") {
1075 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1077 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1080 $main::please_freeze = 1;
1087 # return if the kill list was checked <4 seconds ago
1088 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1092 $squeue_kill_checked = time;
1094 # use killem() on procs whose killtime is reached
1097 if (exists $proc{$_}->{killtime}
1098 && $proc{$_}->{killtime} <= time)
1104 # return if the squeue was checked <60 seconds ago
1105 if (defined $squeue_checked && $squeue_checked > time - 60)
1109 $squeue_checked = time;
1113 # here is an opportunity to check for mysterious problems with local procs
1117 # get a list of steps still running
1118 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1120 if ($squeue[-1] ne "ok")
1126 # which of my jobsteps are running, according to squeue?
1130 if (/^(\d+)\.(\d+) (\S+)/)
1132 if ($1 eq $ENV{SLURM_JOBID})
1139 # which of my active child procs (>60s old) were not mentioned by squeue?
1140 foreach (keys %proc)
1142 if ($proc{$_}->{time} < time - 60
1143 && !exists $ok{$proc{$_}->{jobstepname}}
1144 && !exists $proc{$_}->{killtime})
1146 # kill this proc if it hasn't exited in 30 seconds
1147 $proc{$_}->{killtime} = time + 30;
1153 sub release_allocation
1157 Log (undef, "release job allocation");
1158 system "scancel $ENV{SLURM_JOBID}";
1166 foreach my $job (keys %reader)
1169 while (0 < sysread ($reader{$job}, $buf, 8192))
1171 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1172 $jobstep[$job]->{stderr} .= $buf;
1173 preprocess_stderr ($job);
1174 if (length ($jobstep[$job]->{stderr}) > 16384)
1176 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1185 sub preprocess_stderr
1189 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1191 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1192 Log ($job, "stderr $line");
1193 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1195 $main::please_freeze = 1;
1197 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1198 $jobstep[$job]->{node_fail} = 1;
1199 ban_node_by_slot($jobstep[$job]->{slotindex});
1208 my $task_success = shift;
1209 preprocess_stderr ($job);
1212 Log ($job, "stderr $_");
1213 } split ("\n", $jobstep[$job]->{stderr});
1219 my ($keep, $child_out, $output_block);
1221 my $cmd = "arv-get \Q$hash\E";
1222 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1226 my $bytes = sysread($keep, $buf, 1024 * 1024);
1227 if (!defined $bytes) {
1228 die "reading from arv-get: $!";
1229 } elsif ($bytes == 0) {
1230 # sysread returns 0 at the end of the pipe.
1233 # some bytes were read into buf.
1234 $output_block .= $buf;
1238 return $output_block;
1243 Log (undef, "collate");
1245 my ($child_out, $child_in);
1246 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1247 '--retries', put_retry_count());
1251 next if (!exists $_->{'arvados_task'}->{'output'} ||
1252 !$_->{'arvados_task'}->{'success'});
1253 my $output = $_->{'arvados_task'}->{output};
1254 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1256 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1257 print $child_in $output;
1259 elsif (@jobstep == 1)
1261 $joboutput = $output;
1264 elsif (defined (my $outblock = fetch_block ($output)))
1266 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1267 print $child_in $outblock;
1271 Log (undef, "XXX fetch_block($output) failed XXX");
1277 if (!defined $joboutput) {
1278 my $s = IO::Select->new($child_out);
1279 if ($s->can_read(120)) {
1280 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1282 # TODO: Ensure exit status == 0.
1284 Log (undef, "timed out reading from 'arv-put'");
1287 # TODO: kill $pid instead of waiting, now that we've decided to
1288 # ignore further output.
1299 my $sig = 2; # SIGINT first
1300 if (exists $proc{$_}->{"sent_$sig"} &&
1301 time - $proc{$_}->{"sent_$sig"} > 4)
1303 $sig = 15; # SIGTERM if SIGINT doesn't work
1305 if (exists $proc{$_}->{"sent_$sig"} &&
1306 time - $proc{$_}->{"sent_$sig"} > 4)
1308 $sig = 9; # SIGKILL if SIGTERM doesn't work
1310 if (!exists $proc{$_}->{"sent_$sig"})
1312 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1314 select (undef, undef, undef, 0.1);
1317 kill $sig, $_; # srun wants two SIGINT to really interrupt
1319 $proc{$_}->{"sent_$sig"} = time;
1320 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1330 vec($bits,fileno($_),1) = 1;
1336 # Send log output to Keep via arv-put.
1338 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1339 # $log_pipe_pid is the pid of the arv-put subprocess.
1341 # The only functions that should access these variables directly are:
1343 # log_writer_start($logfilename)
1344 # Starts an arv-put pipe, reading data on stdin and writing it to
1345 # a $logfilename file in an output collection.
1347 # log_writer_send($txt)
1348 # Writes $txt to the output log collection.
1350 # log_writer_finish()
1351 # Closes the arv-put pipe and returns the output that it produces.
1353 # log_writer_is_active()
1354 # Returns a true value if there is currently a live arv-put
1355 # process, false otherwise.
1357 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1359 sub log_writer_start($)
1361 my $logfilename = shift;
1362 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1363 'arv-put', '--portable-data-hash',
1365 '--filename', $logfilename,
1369 sub log_writer_send($)
1372 print $log_pipe_in $txt;
1375 sub log_writer_finish()
1377 return unless $log_pipe_pid;
1379 close($log_pipe_in);
1382 my $s = IO::Select->new($log_pipe_out);
1383 if ($s->can_read(120)) {
1384 sysread($log_pipe_out, $arv_put_output, 1024);
1385 chomp($arv_put_output);
1387 Log (undef, "timed out reading from 'arv-put'");
1390 waitpid($log_pipe_pid, 0);
1391 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1393 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1396 return $arv_put_output;
1399 sub log_writer_is_active() {
1400 return $log_pipe_pid;
1403 sub Log # ($jobstep_id, $logmessage)
1405 if ($_[1] =~ /\n/) {
1406 for my $line (split (/\n/, $_[1])) {
1411 my $fh = select STDERR; $|=1; select $fh;
1412 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1413 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1416 if (log_writer_is_active() || -t STDERR) {
1417 my @gmtime = gmtime;
1418 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1419 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1421 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1423 if (log_writer_is_active()) {
1424 log_writer_send($datetime . " " . $message);
1431 my ($package, $file, $line) = caller;
1432 my $message = "@_ at $file line $line\n";
1433 Log (undef, $message);
1434 freeze() if @jobstep_todo;
1435 collate_output() if @jobstep_todo;
1437 save_meta() if log_writer_is_active();
1444 if ($Job->{'state'} eq 'Cancelled') {
1445 $Job->update_attributes('finished_at' => scalar gmtime);
1447 $Job->update_attributes('state' => 'Failed');
1454 my $justcheckpoint = shift; # false if this will be the last meta saved
1455 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1457 my $loglocator = log_writer_finish();
1458 Log (undef, "log manifest is $loglocator");
1459 $Job->{'log'} = $loglocator;
1460 $Job->update_attributes('log', $loglocator);
1464 sub freeze_if_want_freeze
1466 if ($main::please_freeze)
1468 release_allocation();
1471 # kill some srun procs before freeze+stop
1472 map { $proc{$_} = {} } @_;
1475 killem (keys %proc);
1476 select (undef, undef, undef, 0.1);
1478 while (($died = waitpid (-1, WNOHANG)) > 0)
1480 delete $proc{$died};
1495 Log (undef, "Freeze not implemented");
1502 croak ("Thaw not implemented");
1518 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1525 my $srunargs = shift;
1526 my $execargs = shift;
1527 my $opts = shift || {};
1529 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1530 print STDERR (join (" ",
1531 map { / / ? "'$_'" : $_ }
1534 if $ENV{CRUNCH_DEBUG};
1536 if (defined $stdin) {
1537 my $child = open STDIN, "-|";
1538 defined $child or die "no fork: $!";
1540 print $stdin or die $!;
1541 close STDOUT or die $!;
1546 return system (@$args) if $opts->{fork};
1549 warn "ENV size is ".length(join(" ",%ENV));
1550 die "exec failed: $!: @$args";
1554 sub ban_node_by_slot {
1555 # Don't start any new jobsteps on this node for 60 seconds
1557 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1558 $slot[$slotid]->{node}->{hold_count}++;
1559 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1564 my ($lockfile, $error_message) = @_;
1565 open L, ">", $lockfile or croak("$lockfile: $!");
1566 if (!flock L, LOCK_EX|LOCK_NB) {
1567 croak("Can't lock $lockfile: $error_message\n");
1571 sub find_docker_image {
1572 # Given a Keep locator, check to see if it contains a Docker image.
1573 # If so, return its stream name and Docker hash.
1574 # If not, return undef for both values.
1575 my $locator = shift;
1576 my ($streamname, $filename);
1577 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1578 foreach my $line (split(/\n/, $image->{manifest_text})) {
1579 my @tokens = split(/\s+/, $line);
1581 $streamname = shift(@tokens);
1582 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1583 if (defined($filename)) {
1584 return (undef, undef); # More than one file in the Collection.
1586 $filename = (split(/:/, $filedata, 3))[2];
1591 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1592 return ($streamname, $1);
1594 return (undef, undef);
1598 sub put_retry_count {
1599 # Calculate a --retries argument for arv-put that will have it try
1600 # approximately as long as this Job has been running.
1601 my $stoptime = shift || time;
1602 my $starttime = $jobstep[0]->{starttime};
1603 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1605 while ($timediff >= 2) {
1609 return ($retries > 3) ? $retries : 3;
1613 # Given a $?, return a human-readable exit code string like "0" or
1614 # "1" or "0 with signal 1" or "1 with signal 11".
1615 my $exitcode = shift;
1616 my $s = $exitcode >> 8;
1617 if ($exitcode & 0x7f) {
1618 $s .= " with signal " . ($exitcode & 0x7f);
1620 if ($exitcode & 0x80) {
1621 $s .= " with core dump";
1629 # checkout-and-build
1632 use File::Path qw( make_path );
1634 my $destdir = $ENV{"CRUNCH_SRC"};
1635 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1636 my $repo = $ENV{"CRUNCH_SRC_URL"};
1637 my $task_work = $ENV{"TASK_WORK"};
1639 for my $dir ($destdir, $task_work) {
1642 -e $dir or die "Failed to create temporary directory ($dir): $!";
1646 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1648 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1651 die "Cannot exec `@ARGV`: $!";
1657 unlink "$destdir.commit";
1658 open STDOUT, ">", "$destdir.log";
1659 open STDERR, ">&STDOUT";
1662 my @git_archive_data = <DATA>;
1663 if (@git_archive_data) {
1664 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1665 print TARX @git_archive_data;
1667 die "'tar -C $destdir -xf -' exited $?: $!";
1672 chomp ($pwd = `pwd`);
1673 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1676 for my $src_path ("$destdir/arvados/sdk/python") {
1678 shell_or_die ("virtualenv", $install_dir);
1679 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1683 if (-e "$destdir/crunch_scripts/install") {
1684 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1685 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1687 shell_or_die ("./tests/autotests.sh", $install_dir);
1688 } elsif (-e "./install.sh") {
1689 shell_or_die ("./install.sh", $install_dir);
1693 unlink "$destdir.commit.new";
1694 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1695 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1702 die "Cannot exec `@ARGV`: $!";
1709 if ($ENV{"DEBUG"}) {
1710 print STDERR "@_\n";
1713 or die "@_ failed: $! exit 0x".sprintf("%x",$?);