2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path remove_tree );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
151 if ($jobspec =~ /^[-a-z\d]+$/)
153 # $jobspec is an Arvados UUID, not a JSON job specification
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'state'} ne 'Queued') {
165 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
168 if ($Job->{'success'} ne undef) {
169 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
172 if ($Job->{'running'}) {
173 Log(undef, "Job 'running' flag is already set");
176 if ($Job->{'started_at'}) {
177 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
184 $Job = JSON::decode_json($jobspec);
188 map { croak ("No $_ specified") unless $Job->{$_} }
189 qw(script script_version script_parameters);
192 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193 $Job->{'started_at'} = gmtime;
195 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
197 $job_id = $Job->{'uuid'};
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
207 Log (undef, "check slurm allocation");
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
214 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215 push @sinfo, "$localcpus localhost";
217 if (exists $ENV{SLURM_NODELIST})
219 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
223 my ($ncpus, $slurm_nodelist) = split;
224 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
227 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
230 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
233 foreach (split (",", $ranges))
246 push @nodelist, map {
248 $n =~ s/\[[-,\d]+\]/$_/;
255 push @nodelist, $nodelist;
258 foreach my $nodename (@nodelist)
260 Log (undef, "node $nodename - $ncpus slots");
261 my $node = { name => $nodename,
265 foreach my $cpu (1..$ncpus)
267 push @slot, { node => $node,
271 push @node, @nodelist;
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
290 $Job->update_attributes('state' => 'Running',
291 'tasks_summary' => { 'failed' => 0,
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
329 if (defined $Job->{thawedfromkey})
331 thaw ($Job->{thawedfromkey});
335 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336 'job_uuid' => $Job->{'uuid'},
341 push @jobstep, { 'level' => 0,
343 'arvados_task' => $first_task,
345 push @jobstep_todo, 0;
351 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
358 $build_script = <DATA>;
360 my $nodelist = join(",", @node);
362 if (!defined $no_clear_tmp) {
363 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
364 Log (undef, "Clean work dirs");
366 my $cleanpid = fork();
369 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
370 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
375 last if $cleanpid == waitpid (-1, WNOHANG);
376 freeze_if_want_freeze ($cleanpid);
377 select (undef, undef, undef, 0.1);
379 Log (undef, "Cleanup command exited ".exit_status_s($?));
384 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
385 # If we're in user-land (i.e., not called from crunch-dispatch)
386 # script_version can be an absolute directory path, signifying we
387 # should work straight out of that directory instead of using a git
389 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
390 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
393 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
395 # Install requested code version
396 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
398 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
400 # If we're running under crunch-dispatch, it will have already
401 # pulled the appropriate source tree into its own repository, and
402 # given us that repo's path as $git_dir.
404 # If we're running a "local" job, we might have to fetch content
405 # from a remote repository.
407 # (Currently crunch-dispatch gives a local path with --git-dir, but
408 # we might as well accept URLs there too in case it changes its
410 my $repo = $git_dir || $Job->{'repository'};
412 # Repository can be remote or local. If remote, we'll need to fetch it
413 # to a local dir before doing `git log` et al.
416 if ($repo =~ m{://|\@.*:}) {
417 # $repo is a git url we can clone, like git:// or https:// or
418 # file:/// or git@host:repo.git
419 $repo_location = 'remote';
420 } elsif ($repo =~ m{^\.*/}) {
421 # $repo is a local path to a git index. We'll also resolve ../foo
422 # to ../foo/.git if the latter is a directory.
423 if (-d "$repo/.git") {
424 $repo = "$repo/.git";
426 $repo_location = 'local';
428 # $repo is none of the above. It must be the name of a hosted
430 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
431 'filters' => [['name','=',$repo]]
433 my $n_found = scalar @{$arv_repo_list};
435 Log(undef, "Repository '$repo' -> "
436 . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
439 croak("Error: Found $n_found repositories with name '$repo'.");
441 $repo = $arv_repo_list->[0]->{'fetch_url'};
442 $repo_location = 'remote';
444 Log(undef, "Using $repo_location repository '$repo'");
445 $ENV{"CRUNCH_SRC_URL"} = $repo;
447 # Resolve given script_version (we'll call that $treeish here) to a
448 # commit sha1 ($commit).
449 my $treeish = $Job->{'script_version'};
451 if ($repo_location eq 'remote') {
452 # We minimize excess object-fetching by re-using the same bare
453 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
454 # just keep adding remotes to it as needed.
455 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
456 my $gitcmd = "git --git-dir=\Q$local_repo\E";
458 # Set up our local repo for caching remote objects, making
460 if (!-d $local_repo) {
461 make_path($local_repo) or croak("Error: could not create $local_repo");
463 # This works (exits 0 and doesn't delete fetched objects) even
464 # if $local_repo is already initialized:
465 `$gitcmd init --bare`;
467 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
470 # If $treeish looks like a hash (or abbrev hash) we look it up in
471 # our local cache first, since that's cheaper. (We don't want to
472 # do that with tags/branches though -- those change over time, so
473 # they should always be resolved by the remote repo.)
474 if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
475 # Hide stderr because it's normal for this to fail:
476 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
478 $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
479 $sha1 =~ /^([0-9a-f]{40})$/s) {
481 Log(undef, "Commit $commit already present in $local_repo");
485 if (!defined $commit) {
486 # If $treeish isn't just a hash or abbrev hash, or isn't here
487 # yet, we need to fetch the remote to resolve it correctly.
489 # First, remove all local heads. This prevents a name that does
490 # not exist on the remote from resolving to (or colliding with)
491 # a previously fetched branch or tag (possibly from a different
493 remove_tree("$local_repo/refs/heads", {keep_root => 1});
495 Log(undef, "Fetching objects from $repo to $local_repo");
496 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
498 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
502 # Now that the data is all here, we will use our local repo for
503 # the rest of our git activities.
507 my $gitcmd = "git --git-dir=\Q$repo\E";
508 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
509 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
510 croak("`$gitcmd rev-list` exited "
512 .", '$treeish' not found. Giving up.");
515 Log(undef, "Version $treeish is commit $commit");
517 if ($commit ne $Job->{'script_version'}) {
518 # Record the real commit id in the database, frozentokey, logs,
519 # etc. -- instead of an abbreviation or a branch name which can
520 # become ambiguous or point to a different commit in the future.
521 if (!$Job->update_attributes('script_version' => $commit)) {
522 croak("Error: failed to update job's script_version attribute");
526 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
527 $git_archive = `$gitcmd archive ''\Q$commit\E`;
529 croak("Error: $gitcmd archive exited ".exit_status_s($?));
533 if (!defined $git_archive) {
534 Log(undef, "Skip install phase (no git archive)");
536 Log(undef, "Warning: This probably means workers have no source tree!");
540 Log(undef, "Run install script on all workers");
542 my @srunargs = ("srun",
543 "--nodelist=$nodelist",
544 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
545 my @execargs = ("sh", "-c",
546 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
548 # Note: this section is almost certainly unnecessary if we're
549 # running tasks in docker containers.
550 my $installpid = fork();
551 if ($installpid == 0)
553 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
558 last if $installpid == waitpid (-1, WNOHANG);
559 freeze_if_want_freeze ($installpid);
560 select (undef, undef, undef, 0.1);
562 Log (undef, "Install script exited ".exit_status_s($?));
567 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
568 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
571 # If this job requires a Docker image, install that.
572 my $docker_bin = "/usr/bin/docker.io";
573 my ($docker_locator, $docker_stream, $docker_hash);
574 if ($docker_locator = $Job->{docker_image_locator}) {
575 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
578 croak("No Docker image hash found from locator $docker_locator");
580 $docker_stream =~ s/^\.//;
581 my $docker_install_script = qq{
582 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
583 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
586 my $docker_pid = fork();
587 if ($docker_pid == 0)
589 srun (["srun", "--nodelist=" . join(',', @node)],
590 ["/bin/sh", "-ec", $docker_install_script]);
595 last if $docker_pid == waitpid (-1, WNOHANG);
596 freeze_if_want_freeze ($docker_pid);
597 select (undef, undef, undef, 0.1);
601 croak("Installing Docker image from $docker_locator exited "
606 foreach (qw (script script_version script_parameters runtime_constraints))
610 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
612 foreach (split (/\n/, $Job->{knobs}))
614 Log (undef, "knob " . $_);
619 $main::success = undef;
625 my $thisround_succeeded = 0;
626 my $thisround_failed = 0;
627 my $thisround_failed_multiple = 0;
629 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
630 or $a <=> $b } @jobstep_todo;
631 my $level = $jobstep[$jobstep_todo[0]]->{level};
632 Log (undef, "start level $level");
637 my @freeslot = (0..$#slot);
640 my $progress_is_dirty = 1;
641 my $progress_stats_updated = 0;
643 update_progress_stats();
648 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
650 my $id = $jobstep_todo[$todo_ptr];
651 my $Jobstep = $jobstep[$id];
652 if ($Jobstep->{level} != $level)
657 pipe $reader{$id}, "writer" or croak ($!);
658 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
659 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
661 my $childslot = $freeslot[0];
662 my $childnode = $slot[$childslot]->{node};
663 my $childslotname = join (".",
664 $slot[$childslot]->{node}->{name},
665 $slot[$childslot]->{cpu});
666 my $childpid = fork();
669 $SIG{'INT'} = 'DEFAULT';
670 $SIG{'QUIT'} = 'DEFAULT';
671 $SIG{'TERM'} = 'DEFAULT';
673 foreach (values (%reader))
677 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
678 open(STDOUT,">&writer");
679 open(STDERR,">&writer");
684 delete $ENV{"GNUPGHOME"};
685 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
686 $ENV{"TASK_QSEQUENCE"} = $id;
687 $ENV{"TASK_SEQUENCE"} = $level;
688 $ENV{"JOB_SCRIPT"} = $Job->{script};
689 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
690 $param =~ tr/a-z/A-Z/;
691 $ENV{"JOB_PARAMETER_$param"} = $value;
693 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
694 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
695 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
696 $ENV{"HOME"} = $ENV{"TASK_WORK"};
697 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
698 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
699 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
700 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
706 "--nodelist=".$childnode->{name},
707 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
708 "--job-name=$job_id.$id.$$",
710 my $build_script_to_send = "";
712 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
713 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
714 ."&& cd $ENV{CRUNCH_TMP} ";
717 $build_script_to_send = $build_script;
721 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
724 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
725 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
726 # Dynamically configure the container to use the host system as its
727 # DNS server. Get the host's global addresses from the ip command,
728 # and turn them into docker --dns options using gawk.
730 q{$(ip -o address show scope global |
731 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
732 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
733 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
734 $command .= "--env=\QHOME=/home/crunch\E ";
735 while (my ($env_key, $env_val) = each %ENV)
737 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
738 if ($env_key eq "TASK_WORK") {
739 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
741 elsif ($env_key eq "TASK_KEEPMOUNT") {
742 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
745 $command .= "--env=\Q$env_key=$env_val\E ";
749 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
750 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
751 $command .= "\Q$docker_hash\E ";
752 $command .= "stdbuf --output=0 --error=0 ";
753 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
756 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
757 $command .= "stdbuf --output=0 --error=0 ";
758 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
761 my @execargs = ('bash', '-c', $command);
762 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
763 # exec() failed, we assume nothing happened.
764 Log(undef, "srun() failed on build script");
768 if (!defined $childpid)
775 $proc{$childpid} = { jobstep => $id,
778 jobstepname => "$job_id.$id.$childpid",
780 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
781 $slot[$childslot]->{pid} = $childpid;
783 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
784 Log ($id, "child $childpid started on $childslotname");
785 $Jobstep->{starttime} = time;
786 $Jobstep->{node} = $childnode->{name};
787 $Jobstep->{slotindex} = $childslot;
788 delete $Jobstep->{stderr};
789 delete $Jobstep->{finishtime};
791 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
792 $Jobstep->{'arvados_task'}->save;
794 splice @jobstep_todo, $todo_ptr, 1;
797 $progress_is_dirty = 1;
801 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
803 last THISROUND if $main::please_freeze;
804 if ($main::please_info)
806 $main::please_info = 0;
810 update_progress_stats();
817 check_refresh_wanted();
819 update_progress_stats();
820 select (undef, undef, undef, 0.1);
822 elsif (time - $progress_stats_updated >= 30)
824 update_progress_stats();
826 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
827 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
829 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
830 .($thisround_failed+$thisround_succeeded)
831 .") -- giving up on this round";
832 Log (undef, $message);
836 # move slots from freeslot to holdslot (or back to freeslot) if necessary
837 for (my $i=$#freeslot; $i>=0; $i--) {
838 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
839 push @holdslot, (splice @freeslot, $i, 1);
842 for (my $i=$#holdslot; $i>=0; $i--) {
843 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
844 push @freeslot, (splice @holdslot, $i, 1);
848 # give up if no nodes are succeeding
849 if (!grep { $_->{node}->{losing_streak} == 0 &&
850 $_->{node}->{hold_count} < 4 } @slot) {
851 my $message = "Every node has failed -- giving up on this round";
852 Log (undef, $message);
859 push @freeslot, splice @holdslot;
860 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
863 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
866 if ($main::please_continue) {
867 $main::please_continue = 0;
870 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
874 check_refresh_wanted();
876 update_progress_stats();
877 select (undef, undef, undef, 0.1);
878 killem (keys %proc) if $main::please_freeze;
882 update_progress_stats();
883 freeze_if_want_freeze();
886 if (!defined $main::success)
889 $thisround_succeeded == 0 &&
890 ($thisround_failed == 0 || $thisround_failed > 4))
892 my $message = "stop because $thisround_failed tasks failed and none succeeded";
893 Log (undef, $message);
902 goto ONELEVEL if !defined $main::success;
905 release_allocation();
907 my $collated_output = &collate_output();
909 if (!$collated_output) {
910 Log(undef, "output undef");
914 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
915 or die "failed to get collated manifest: $!";
916 my $orig_manifest_text = '';
917 while (my $manifest_line = <$orig_manifest>) {
918 $orig_manifest_text .= $manifest_line;
920 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
921 'manifest_text' => $orig_manifest_text,
923 Log(undef, "output uuid " . $output->{uuid});
924 Log(undef, "output hash " . $output->{portable_data_hash});
925 $Job->update_attributes('output' => $output->{portable_data_hash});
928 Log (undef, "Failed to register output manifest: $@");
932 Log (undef, "finish");
937 if ($collated_output && $main::success) {
938 $final_state = 'Complete';
940 $final_state = 'Failed';
942 $Job->update_attributes('state' => $final_state);
944 exit (($final_state eq 'Complete') ? 0 : 1);
948 sub update_progress_stats
950 $progress_stats_updated = time;
951 return if !$progress_is_dirty;
952 my ($todo, $done, $running) = (scalar @jobstep_todo,
953 scalar @jobstep_done,
954 scalar @slot - scalar @freeslot - scalar @holdslot);
955 $Job->{'tasks_summary'} ||= {};
956 $Job->{'tasks_summary'}->{'todo'} = $todo;
957 $Job->{'tasks_summary'}->{'done'} = $done;
958 $Job->{'tasks_summary'}->{'running'} = $running;
959 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
960 Log (undef, "status: $done done, $running running, $todo todo");
961 $progress_is_dirty = 0;
968 my $pid = waitpid (-1, WNOHANG);
969 return 0 if $pid <= 0;
971 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
973 . $slot[$proc{$pid}->{slot}]->{cpu});
974 my $jobstepid = $proc{$pid}->{jobstep};
975 my $elapsed = time - $proc{$pid}->{time};
976 my $Jobstep = $jobstep[$jobstepid];
978 my $childstatus = $?;
979 my $exitvalue = $childstatus >> 8;
980 my $exitinfo = "exit ".exit_status_s($childstatus);
981 $Jobstep->{'arvados_task'}->reload;
982 my $task_success = $Jobstep->{'arvados_task'}->{success};
984 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
986 if (!defined $task_success) {
987 # task did not indicate one way or the other --> fail
988 $Jobstep->{'arvados_task'}->{success} = 0;
989 $Jobstep->{'arvados_task'}->save;
996 $temporary_fail ||= $Jobstep->{node_fail};
997 $temporary_fail ||= ($exitvalue == 111);
1000 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1002 # Check for signs of a failed or misconfigured node
1003 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1004 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1005 # Don't count this against jobstep failure thresholds if this
1006 # node is already suspected faulty and srun exited quickly
1007 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1009 Log ($jobstepid, "blaming failure on suspect node " .
1010 $slot[$proc{$pid}->{slot}]->{node}->{name});
1011 $temporary_fail ||= 1;
1013 ban_node_by_slot($proc{$pid}->{slot});
1016 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1017 ++$Jobstep->{'failures'},
1018 $temporary_fail ? 'temporary ' : 'permanent',
1021 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1022 # Give up on this task, and the whole job
1024 $main::please_freeze = 1;
1026 # Put this task back on the todo queue
1027 push @jobstep_todo, $jobstepid;
1028 $Job->{'tasks_summary'}->{'failed'}++;
1032 ++$thisround_succeeded;
1033 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1034 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1035 push @jobstep_done, $jobstepid;
1036 Log ($jobstepid, "success in $elapsed seconds");
1038 $Jobstep->{exitcode} = $childstatus;
1039 $Jobstep->{finishtime} = time;
1040 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1041 $Jobstep->{'arvados_task'}->save;
1042 process_stderr ($jobstepid, $task_success);
1043 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1045 close $reader{$jobstepid};
1046 delete $reader{$jobstepid};
1047 delete $slot[$proc{$pid}->{slot}]->{pid};
1048 push @freeslot, $proc{$pid}->{slot};
1051 if ($task_success) {
1053 my $newtask_list = [];
1054 my $newtask_results;
1056 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1058 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1060 'order' => 'qsequence',
1061 'offset' => scalar(@$newtask_list),
1063 push(@$newtask_list, @{$newtask_results->{items}});
1064 } while (@{$newtask_results->{items}});
1065 foreach my $arvados_task (@$newtask_list) {
1067 'level' => $arvados_task->{'sequence'},
1069 'arvados_task' => $arvados_task
1071 push @jobstep, $jobstep;
1072 push @jobstep_todo, $#jobstep;
1076 $progress_is_dirty = 1;
1080 sub check_refresh_wanted
1082 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1083 if (@stat && $stat[9] > $latest_refresh) {
1084 $latest_refresh = scalar time;
1085 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1086 for my $attr ('cancelled_at',
1087 'cancelled_by_user_uuid',
1088 'cancelled_by_client_uuid',
1090 $Job->{$attr} = $Job2->{$attr};
1092 if ($Job->{'state'} ne "Running") {
1093 if ($Job->{'state'} eq "Cancelled") {
1094 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1096 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1099 $main::please_freeze = 1;
1106 # return if the kill list was checked <4 seconds ago
1107 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1111 $squeue_kill_checked = time;
1113 # use killem() on procs whose killtime is reached
1116 if (exists $proc{$_}->{killtime}
1117 && $proc{$_}->{killtime} <= time)
1123 # return if the squeue was checked <60 seconds ago
1124 if (defined $squeue_checked && $squeue_checked > time - 60)
1128 $squeue_checked = time;
1132 # here is an opportunity to check for mysterious problems with local procs
1136 # get a list of steps still running
1137 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1139 if ($squeue[-1] ne "ok")
1145 # which of my jobsteps are running, according to squeue?
1149 if (/^(\d+)\.(\d+) (\S+)/)
1151 if ($1 eq $ENV{SLURM_JOBID})
1158 # which of my active child procs (>60s old) were not mentioned by squeue?
1159 foreach (keys %proc)
1161 if ($proc{$_}->{time} < time - 60
1162 && !exists $ok{$proc{$_}->{jobstepname}}
1163 && !exists $proc{$_}->{killtime})
1165 # kill this proc if it hasn't exited in 30 seconds
1166 $proc{$_}->{killtime} = time + 30;
1172 sub release_allocation
1176 Log (undef, "release job allocation");
1177 system "scancel $ENV{SLURM_JOBID}";
1185 foreach my $job (keys %reader)
1188 while (0 < sysread ($reader{$job}, $buf, 8192))
1190 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1191 $jobstep[$job]->{stderr} .= $buf;
1192 preprocess_stderr ($job);
1193 if (length ($jobstep[$job]->{stderr}) > 16384)
1195 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1204 sub preprocess_stderr
1208 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1210 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1211 Log ($job, "stderr $line");
1212 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1214 $main::please_freeze = 1;
1216 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1217 $jobstep[$job]->{node_fail} = 1;
1218 ban_node_by_slot($jobstep[$job]->{slotindex});
1227 my $task_success = shift;
1228 preprocess_stderr ($job);
1231 Log ($job, "stderr $_");
1232 } split ("\n", $jobstep[$job]->{stderr});
1238 my ($keep, $child_out, $output_block);
1240 my $cmd = "arv-get \Q$hash\E";
1241 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1245 my $bytes = sysread($keep, $buf, 1024 * 1024);
1246 if (!defined $bytes) {
1247 die "reading from arv-get: $!";
1248 } elsif ($bytes == 0) {
1249 # sysread returns 0 at the end of the pipe.
1252 # some bytes were read into buf.
1253 $output_block .= $buf;
1257 return $output_block;
1262 Log (undef, "collate");
1264 my ($child_out, $child_in);
1265 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1266 '--retries', put_retry_count());
1270 next if (!exists $_->{'arvados_task'}->{'output'} ||
1271 !$_->{'arvados_task'}->{'success'});
1272 my $output = $_->{'arvados_task'}->{output};
1273 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1275 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1276 print $child_in $output;
1278 elsif (@jobstep == 1)
1280 $joboutput = $output;
1283 elsif (defined (my $outblock = fetch_block ($output)))
1285 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1286 print $child_in $outblock;
1290 Log (undef, "XXX fetch_block($output) failed XXX");
1296 if (!defined $joboutput) {
1297 my $s = IO::Select->new($child_out);
1298 if ($s->can_read(120)) {
1299 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1301 # TODO: Ensure exit status == 0.
1303 Log (undef, "timed out reading from 'arv-put'");
1306 # TODO: kill $pid instead of waiting, now that we've decided to
1307 # ignore further output.
1318 my $sig = 2; # SIGINT first
1319 if (exists $proc{$_}->{"sent_$sig"} &&
1320 time - $proc{$_}->{"sent_$sig"} > 4)
1322 $sig = 15; # SIGTERM if SIGINT doesn't work
1324 if (exists $proc{$_}->{"sent_$sig"} &&
1325 time - $proc{$_}->{"sent_$sig"} > 4)
1327 $sig = 9; # SIGKILL if SIGTERM doesn't work
1329 if (!exists $proc{$_}->{"sent_$sig"})
1331 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1333 select (undef, undef, undef, 0.1);
1336 kill $sig, $_; # srun wants two SIGINT to really interrupt
1338 $proc{$_}->{"sent_$sig"} = time;
1339 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1349 vec($bits,fileno($_),1) = 1;
1355 sub Log # ($jobstep_id, $logmessage)
1357 if ($_[1] =~ /\n/) {
1358 for my $line (split (/\n/, $_[1])) {
1363 my $fh = select STDERR; $|=1; select $fh;
1364 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1365 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1368 if ($local_logfile || -t STDERR) {
1369 my @gmtime = gmtime;
1370 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1371 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1373 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1375 if ($local_logfile) {
1376 print $local_logfile $datetime . " " . $message;
1383 my ($package, $file, $line) = caller;
1384 my $message = "@_ at $file line $line\n";
1385 Log (undef, $message);
1386 freeze() if @jobstep_todo;
1387 collate_output() if @jobstep_todo;
1389 save_meta() if $local_logfile;
1396 if ($Job->{'state'} eq 'Cancelled') {
1397 $Job->update_attributes('finished_at' => scalar gmtime);
1399 $Job->update_attributes('state' => 'Failed');
1406 my $justcheckpoint = shift; # false if this will be the last meta saved
1407 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1409 $local_logfile->flush;
1410 my $retry_count = put_retry_count();
1411 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1412 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1413 my $loglocator = `$cmd`;
1414 die "system $cmd exited ".exit_status_s($?) if $?;
1417 $local_logfile = undef; # the temp file is automatically deleted
1418 Log (undef, "log manifest is $loglocator");
1419 $Job->{'log'} = $loglocator;
1420 $Job->update_attributes('log', $loglocator);
1424 sub freeze_if_want_freeze
1426 if ($main::please_freeze)
1428 release_allocation();
1431 # kill some srun procs before freeze+stop
1432 map { $proc{$_} = {} } @_;
1435 killem (keys %proc);
1436 select (undef, undef, undef, 0.1);
1438 while (($died = waitpid (-1, WNOHANG)) > 0)
1440 delete $proc{$died};
1455 Log (undef, "Freeze not implemented");
1462 croak ("Thaw not implemented");
1478 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1485 my $srunargs = shift;
1486 my $execargs = shift;
1487 my $opts = shift || {};
1489 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1490 print STDERR (join (" ",
1491 map { / / ? "'$_'" : $_ }
1494 if $ENV{CRUNCH_DEBUG};
1496 if (defined $stdin) {
1497 my $child = open STDIN, "-|";
1498 defined $child or die "no fork: $!";
1500 print $stdin or die $!;
1501 close STDOUT or die $!;
1506 return system (@$args) if $opts->{fork};
1509 warn "ENV size is ".length(join(" ",%ENV));
1510 die "exec failed: $!: @$args";
1514 sub ban_node_by_slot {
1515 # Don't start any new jobsteps on this node for 60 seconds
1517 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1518 $slot[$slotid]->{node}->{hold_count}++;
1519 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1524 my ($lockfile, $error_message) = @_;
1525 open L, ">", $lockfile or croak("$lockfile: $!");
1526 if (!flock L, LOCK_EX|LOCK_NB) {
1527 croak("Can't lock $lockfile: $error_message\n");
1531 sub find_docker_image {
1532 # Given a Keep locator, check to see if it contains a Docker image.
1533 # If so, return its stream name and Docker hash.
1534 # If not, return undef for both values.
1535 my $locator = shift;
1536 my ($streamname, $filename);
1537 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1538 foreach my $line (split(/\n/, $image->{manifest_text})) {
1539 my @tokens = split(/\s+/, $line);
1541 $streamname = shift(@tokens);
1542 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1543 if (defined($filename)) {
1544 return (undef, undef); # More than one file in the Collection.
1546 $filename = (split(/:/, $filedata, 3))[2];
1551 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1552 return ($streamname, $1);
1554 return (undef, undef);
1558 sub put_retry_count {
1559 # Calculate a --retries argument for arv-put that will have it try
1560 # approximately as long as this Job has been running.
1561 my $stoptime = shift || time;
1562 my $starttime = $jobstep[0]->{starttime};
1563 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1565 while ($timediff >= 2) {
1569 return ($retries > 3) ? $retries : 3;
1573 # Given a $?, return a human-readable exit code string like "0" or
1574 # "1" or "0 with signal 1" or "1 with signal 11".
1575 my $exitcode = shift;
1576 my $s = $exitcode >> 8;
1577 if ($exitcode & 0x7f) {
1578 $s .= " with signal " . ($exitcode & 0x7f);
1580 if ($exitcode & 0x80) {
1581 $s .= " with core dump";
1589 # checkout-and-build
1592 use File::Path qw( make_path );
1594 my $destdir = $ENV{"CRUNCH_SRC"};
1595 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1596 my $repo = $ENV{"CRUNCH_SRC_URL"};
1597 my $task_work = $ENV{"TASK_WORK"};
1599 for my $dir ($destdir, $task_work) {
1602 -e $dir or die "Failed to create temporary directory ($dir): $!";
1606 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1608 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1611 die "Cannot exec `@ARGV`: $!";
1617 unlink "$destdir.commit";
1618 open STDOUT, ">", "$destdir.log";
1619 open STDERR, ">&STDOUT";
1622 my @git_archive_data = <DATA>;
1623 if (@git_archive_data) {
1624 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1625 print TARX @git_archive_data;
1627 die "'tar -C $destdir -xf -' exited $?: $!";
1632 chomp ($pwd = `pwd`);
1633 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1636 for my $src_path ("$destdir/arvados/sdk/python") {
1638 shell_or_die ("virtualenv", $install_dir);
1639 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1643 if (-e "$destdir/crunch_scripts/install") {
1644 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1645 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1647 shell_or_die ("./tests/autotests.sh", $install_dir);
1648 } elsif (-e "./install.sh") {
1649 shell_or_die ("./install.sh", $install_dir);
1653 unlink "$destdir.commit.new";
1654 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1655 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1662 die "Cannot exec `@ARGV`: $!";
1669 if ($ENV{"DEBUG"}) {
1670 print STDERR "@_\n";
1673 or die "@_ failed: $! exit 0x".sprintf("%x",$?);