2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path remove_tree );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $User = $arv->{'users'}->{'current'}->execute;
150 if ($jobspec =~ /^[-a-z\d]+$/)
152 # $jobspec is an Arvados UUID, not a JSON job specification
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # Claim this job, and make sure nobody else does
157 # lock() sets is_locked_by_uuid and changes state to Running.
158 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
161 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
168 $Job = JSON::decode_json($jobspec);
172 map { croak ("No $_ specified") unless $Job->{$_} }
173 qw(script script_version script_parameters);
176 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177 $Job->{'started_at'} = gmtime;
179 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181 $job_id = $Job->{'uuid'};
183 my $keep_logfile = $job_id . '.log.txt';
184 log_writer_start($keep_logfile);
186 $Job->{'runtime_constraints'} ||= {};
187 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
188 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
191 Log (undef, "check slurm allocation");
194 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
198 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
199 push @sinfo, "$localcpus localhost";
201 if (exists $ENV{SLURM_NODELIST})
203 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
207 my ($ncpus, $slurm_nodelist) = split;
208 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
211 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
214 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
217 foreach (split (",", $ranges))
230 push @nodelist, map {
232 $n =~ s/\[[-,\d]+\]/$_/;
239 push @nodelist, $nodelist;
242 foreach my $nodename (@nodelist)
244 Log (undef, "node $nodename - $ncpus slots");
245 my $node = { name => $nodename,
249 foreach my $cpu (1..$ncpus)
251 push @slot, { node => $node,
255 push @node, @nodelist;
260 # Ensure that we get one jobstep running on each allocated node before
261 # we start overloading nodes with concurrent steps
263 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
266 $Job->update_attributes(
267 'tasks_summary' => { 'failed' => 0,
272 Log (undef, "start");
273 $SIG{'INT'} = sub { $main::please_freeze = 1; };
274 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
275 $SIG{'TERM'} = \&croak;
276 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
277 $SIG{'ALRM'} = sub { $main::please_info = 1; };
278 $SIG{'CONT'} = sub { $main::please_continue = 1; };
279 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
281 $main::please_freeze = 0;
282 $main::please_info = 0;
283 $main::please_continue = 0;
284 $main::please_refresh = 0;
285 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
300 my $latest_refresh = scalar time;
304 if (defined $Job->{thawedfromkey})
306 thaw ($Job->{thawedfromkey});
310 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
311 'job_uuid' => $Job->{'uuid'},
316 push @jobstep, { 'level' => 0,
318 'arvados_task' => $first_task,
320 push @jobstep_todo, 0;
326 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
333 $build_script = <DATA>;
335 my $nodelist = join(",", @node);
337 if (!defined $no_clear_tmp) {
338 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
339 Log (undef, "Clean work dirs");
341 my $cleanpid = fork();
344 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
345 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
350 last if $cleanpid == waitpid (-1, WNOHANG);
351 freeze_if_want_freeze ($cleanpid);
352 select (undef, undef, undef, 0.1);
354 Log (undef, "Cleanup command exited ".exit_status_s($?));
359 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
360 # If we're in user-land (i.e., not called from crunch-dispatch)
361 # script_version can be an absolute directory path, signifying we
362 # should work straight out of that directory instead of using a git
364 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
365 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
368 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370 # Install requested code version
371 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
373 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
375 # If we're running under crunch-dispatch, it will have already
376 # pulled the appropriate source tree into its own repository, and
377 # given us that repo's path as $git_dir.
379 # If we're running a "local" job, we might have to fetch content
380 # from a remote repository.
382 # (Currently crunch-dispatch gives a local path with --git-dir, but
383 # we might as well accept URLs there too in case it changes its
385 my $repo = $git_dir || $Job->{'repository'};
387 # Repository can be remote or local. If remote, we'll need to fetch it
388 # to a local dir before doing `git log` et al.
391 if ($repo =~ m{://|^[^/]*:}) {
392 # $repo is a git url we can clone, like git:// or https:// or
393 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
394 # not recognized here because distinguishing that from a local
395 # path is too fragile. If you really need something strange here,
396 # use the ssh:// form.
397 $repo_location = 'remote';
398 } elsif ($repo =~ m{^\.*/}) {
399 # $repo is a local path to a git index. We'll also resolve ../foo
400 # to ../foo/.git if the latter is a directory. To help
401 # disambiguate local paths from named hosted repositories, this
402 # form must be given as ./ or ../ if it's a relative path.
403 if (-d "$repo/.git") {
404 $repo = "$repo/.git";
406 $repo_location = 'local';
408 # $repo is none of the above. It must be the name of a hosted
410 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
411 'filters' => [['name','=',$repo]]
413 my $n_found = scalar @{$arv_repo_list};
415 Log(undef, "Repository '$repo' -> "
416 . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
419 croak("Error: Found $n_found repositories with name '$repo'.");
421 $repo = $arv_repo_list->[0]->{'fetch_url'};
422 $repo_location = 'remote';
424 Log(undef, "Using $repo_location repository '$repo'");
425 $ENV{"CRUNCH_SRC_URL"} = $repo;
427 # Resolve given script_version (we'll call that $treeish here) to a
428 # commit sha1 ($commit).
429 my $treeish = $Job->{'script_version'};
431 if ($repo_location eq 'remote') {
432 # We minimize excess object-fetching by re-using the same bare
433 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
434 # just keep adding remotes to it as needed.
435 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
436 my $gitcmd = "git --git-dir=\Q$local_repo\E";
438 # Set up our local repo for caching remote objects, making
440 if (!-d $local_repo) {
441 make_path($local_repo) or croak("Error: could not create $local_repo");
443 # This works (exits 0 and doesn't delete fetched objects) even
444 # if $local_repo is already initialized:
445 `$gitcmd init --bare`;
447 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
450 # If $treeish looks like a hash (or abbrev hash) we look it up in
451 # our local cache first, since that's cheaper. (We don't want to
452 # do that with tags/branches though -- those change over time, so
453 # they should always be resolved by the remote repo.)
454 if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
455 # Hide stderr because it's normal for this to fail:
456 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
458 $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
459 $sha1 =~ /^([0-9a-f]{40})$/s) {
461 Log(undef, "Commit $commit already present in $local_repo");
465 if (!defined $commit) {
466 # If $treeish isn't just a hash or abbrev hash, or isn't here
467 # yet, we need to fetch the remote to resolve it correctly.
469 # First, remove all local heads. This prevents a name that does
470 # not exist on the remote from resolving to (or colliding with)
471 # a previously fetched branch or tag (possibly from a different
473 remove_tree("$local_repo/refs/heads", {keep_root => 1});
475 Log(undef, "Fetching objects from $repo to $local_repo");
476 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
478 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
482 # Now that the data is all here, we will use our local repo for
483 # the rest of our git activities.
487 my $gitcmd = "git --git-dir=\Q$repo\E";
488 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
489 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
490 croak("`$gitcmd rev-list` exited "
492 .", '$treeish' not found. Giving up.");
495 Log(undef, "Version $treeish is commit $commit");
497 if ($commit ne $Job->{'script_version'}) {
498 # Record the real commit id in the database, frozentokey, logs,
499 # etc. -- instead of an abbreviation or a branch name which can
500 # become ambiguous or point to a different commit in the future.
501 if (!$Job->update_attributes('script_version' => $commit)) {
502 croak("Error: failed to update job's script_version attribute");
506 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
507 $git_archive = `$gitcmd archive ''\Q$commit\E`;
509 croak("Error: $gitcmd archive exited ".exit_status_s($?));
513 if (!defined $git_archive) {
514 Log(undef, "Skip install phase (no git archive)");
516 Log(undef, "Warning: This probably means workers have no source tree!");
520 Log(undef, "Run install script on all workers");
522 my @srunargs = ("srun",
523 "--nodelist=$nodelist",
524 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
525 my @execargs = ("sh", "-c",
526 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
528 # Note: this section is almost certainly unnecessary if we're
529 # running tasks in docker containers.
530 my $installpid = fork();
531 if ($installpid == 0)
533 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
538 last if $installpid == waitpid (-1, WNOHANG);
539 freeze_if_want_freeze ($installpid);
540 select (undef, undef, undef, 0.1);
542 Log (undef, "Install script exited ".exit_status_s($?));
547 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
548 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
551 # If this job requires a Docker image, install that.
552 my $docker_bin = "/usr/bin/docker.io";
553 my ($docker_locator, $docker_stream, $docker_hash);
554 if ($docker_locator = $Job->{docker_image_locator}) {
555 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
558 croak("No Docker image hash found from locator $docker_locator");
560 $docker_stream =~ s/^\.//;
561 my $docker_install_script = qq{
562 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
563 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
566 my $docker_pid = fork();
567 if ($docker_pid == 0)
569 srun (["srun", "--nodelist=" . join(',', @node)],
570 ["/bin/sh", "-ec", $docker_install_script]);
575 last if $docker_pid == waitpid (-1, WNOHANG);
576 freeze_if_want_freeze ($docker_pid);
577 select (undef, undef, undef, 0.1);
581 croak("Installing Docker image from $docker_locator exited "
586 foreach (qw (script script_version script_parameters runtime_constraints))
590 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
592 foreach (split (/\n/, $Job->{knobs}))
594 Log (undef, "knob " . $_);
599 $main::success = undef;
605 my $thisround_succeeded = 0;
606 my $thisround_failed = 0;
607 my $thisround_failed_multiple = 0;
609 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
610 or $a <=> $b } @jobstep_todo;
611 my $level = $jobstep[$jobstep_todo[0]]->{level};
612 Log (undef, "start level $level");
617 my @freeslot = (0..$#slot);
620 my $progress_is_dirty = 1;
621 my $progress_stats_updated = 0;
623 update_progress_stats();
628 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
630 my $id = $jobstep_todo[$todo_ptr];
631 my $Jobstep = $jobstep[$id];
632 if ($Jobstep->{level} != $level)
637 pipe $reader{$id}, "writer" or croak ($!);
638 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
639 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
641 my $childslot = $freeslot[0];
642 my $childnode = $slot[$childslot]->{node};
643 my $childslotname = join (".",
644 $slot[$childslot]->{node}->{name},
645 $slot[$childslot]->{cpu});
646 my $childpid = fork();
649 $SIG{'INT'} = 'DEFAULT';
650 $SIG{'QUIT'} = 'DEFAULT';
651 $SIG{'TERM'} = 'DEFAULT';
653 foreach (values (%reader))
657 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
658 open(STDOUT,">&writer");
659 open(STDERR,">&writer");
664 delete $ENV{"GNUPGHOME"};
665 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
666 $ENV{"TASK_QSEQUENCE"} = $id;
667 $ENV{"TASK_SEQUENCE"} = $level;
668 $ENV{"JOB_SCRIPT"} = $Job->{script};
669 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
670 $param =~ tr/a-z/A-Z/;
671 $ENV{"JOB_PARAMETER_$param"} = $value;
673 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
674 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
675 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
676 $ENV{"HOME"} = $ENV{"TASK_WORK"};
677 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
678 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
679 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
680 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
686 "--nodelist=".$childnode->{name},
687 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
688 "--job-name=$job_id.$id.$$",
690 my $build_script_to_send = "";
692 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
693 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
694 ."&& cd $ENV{CRUNCH_TMP} ";
697 $build_script_to_send = $build_script;
701 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
704 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
705 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
706 # Dynamically configure the container to use the host system as its
707 # DNS server. Get the host's global addresses from the ip command,
708 # and turn them into docker --dns options using gawk.
710 q{$(ip -o address show scope global |
711 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
712 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
713 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
714 $command .= "--env=\QHOME=/home/crunch\E ";
715 while (my ($env_key, $env_val) = each %ENV)
717 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
718 if ($env_key eq "TASK_WORK") {
719 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
721 elsif ($env_key eq "TASK_KEEPMOUNT") {
722 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
725 $command .= "--env=\Q$env_key=$env_val\E ";
729 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
730 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
731 $command .= "\Q$docker_hash\E ";
732 $command .= "stdbuf --output=0 --error=0 ";
733 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
736 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
737 $command .= "stdbuf --output=0 --error=0 ";
738 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
741 my @execargs = ('bash', '-c', $command);
742 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
743 # exec() failed, we assume nothing happened.
744 Log(undef, "srun() failed on build script");
748 if (!defined $childpid)
755 $proc{$childpid} = { jobstep => $id,
758 jobstepname => "$job_id.$id.$childpid",
760 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
761 $slot[$childslot]->{pid} = $childpid;
763 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
764 Log ($id, "child $childpid started on $childslotname");
765 $Jobstep->{starttime} = time;
766 $Jobstep->{node} = $childnode->{name};
767 $Jobstep->{slotindex} = $childslot;
768 delete $Jobstep->{stderr};
769 delete $Jobstep->{finishtime};
771 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
772 $Jobstep->{'arvados_task'}->save;
774 splice @jobstep_todo, $todo_ptr, 1;
777 $progress_is_dirty = 1;
781 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
783 last THISROUND if $main::please_freeze;
784 if ($main::please_info)
786 $main::please_info = 0;
790 update_progress_stats();
797 check_refresh_wanted();
799 update_progress_stats();
800 select (undef, undef, undef, 0.1);
802 elsif (time - $progress_stats_updated >= 30)
804 update_progress_stats();
806 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
807 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
809 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
810 .($thisround_failed+$thisround_succeeded)
811 .") -- giving up on this round";
812 Log (undef, $message);
816 # move slots from freeslot to holdslot (or back to freeslot) if necessary
817 for (my $i=$#freeslot; $i>=0; $i--) {
818 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
819 push @holdslot, (splice @freeslot, $i, 1);
822 for (my $i=$#holdslot; $i>=0; $i--) {
823 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
824 push @freeslot, (splice @holdslot, $i, 1);
828 # give up if no nodes are succeeding
829 if (!grep { $_->{node}->{losing_streak} == 0 &&
830 $_->{node}->{hold_count} < 4 } @slot) {
831 my $message = "Every node has failed -- giving up on this round";
832 Log (undef, $message);
839 push @freeslot, splice @holdslot;
840 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
843 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
846 if ($main::please_continue) {
847 $main::please_continue = 0;
850 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
854 check_refresh_wanted();
856 update_progress_stats();
857 select (undef, undef, undef, 0.1);
858 killem (keys %proc) if $main::please_freeze;
862 update_progress_stats();
863 freeze_if_want_freeze();
866 if (!defined $main::success)
869 $thisround_succeeded == 0 &&
870 ($thisround_failed == 0 || $thisround_failed > 4))
872 my $message = "stop because $thisround_failed tasks failed and none succeeded";
873 Log (undef, $message);
882 goto ONELEVEL if !defined $main::success;
885 release_allocation();
887 my $collated_output = &collate_output();
889 if (!$collated_output) {
890 Log(undef, "output undef");
894 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
895 or die "failed to get collated manifest: $!";
896 my $orig_manifest_text = '';
897 while (my $manifest_line = <$orig_manifest>) {
898 $orig_manifest_text .= $manifest_line;
900 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
901 'manifest_text' => $orig_manifest_text,
903 Log(undef, "output uuid " . $output->{uuid});
904 Log(undef, "output hash " . $output->{portable_data_hash});
905 $Job->update_attributes('output' => $output->{portable_data_hash});
908 Log (undef, "Failed to register output manifest: $@");
912 Log (undef, "finish");
917 if ($collated_output && $main::success) {
918 $final_state = 'Complete';
920 $final_state = 'Failed';
922 $Job->update_attributes('state' => $final_state);
924 exit (($final_state eq 'Complete') ? 0 : 1);
928 sub update_progress_stats
930 $progress_stats_updated = time;
931 return if !$progress_is_dirty;
932 my ($todo, $done, $running) = (scalar @jobstep_todo,
933 scalar @jobstep_done,
934 scalar @slot - scalar @freeslot - scalar @holdslot);
935 $Job->{'tasks_summary'} ||= {};
936 $Job->{'tasks_summary'}->{'todo'} = $todo;
937 $Job->{'tasks_summary'}->{'done'} = $done;
938 $Job->{'tasks_summary'}->{'running'} = $running;
939 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
940 Log (undef, "status: $done done, $running running, $todo todo");
941 $progress_is_dirty = 0;
948 my $pid = waitpid (-1, WNOHANG);
949 return 0 if $pid <= 0;
951 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
953 . $slot[$proc{$pid}->{slot}]->{cpu});
954 my $jobstepid = $proc{$pid}->{jobstep};
955 my $elapsed = time - $proc{$pid}->{time};
956 my $Jobstep = $jobstep[$jobstepid];
958 my $childstatus = $?;
959 my $exitvalue = $childstatus >> 8;
960 my $exitinfo = "exit ".exit_status_s($childstatus);
961 $Jobstep->{'arvados_task'}->reload;
962 my $task_success = $Jobstep->{'arvados_task'}->{success};
964 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
966 if (!defined $task_success) {
967 # task did not indicate one way or the other --> fail
968 $Jobstep->{'arvados_task'}->{success} = 0;
969 $Jobstep->{'arvados_task'}->save;
976 $temporary_fail ||= $Jobstep->{node_fail};
977 $temporary_fail ||= ($exitvalue == 111);
980 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
982 # Check for signs of a failed or misconfigured node
983 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
984 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
985 # Don't count this against jobstep failure thresholds if this
986 # node is already suspected faulty and srun exited quickly
987 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
989 Log ($jobstepid, "blaming failure on suspect node " .
990 $slot[$proc{$pid}->{slot}]->{node}->{name});
991 $temporary_fail ||= 1;
993 ban_node_by_slot($proc{$pid}->{slot});
996 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
997 ++$Jobstep->{'failures'},
998 $temporary_fail ? 'temporary ' : 'permanent',
1001 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1002 # Give up on this task, and the whole job
1004 $main::please_freeze = 1;
1006 # Put this task back on the todo queue
1007 push @jobstep_todo, $jobstepid;
1008 $Job->{'tasks_summary'}->{'failed'}++;
1012 ++$thisround_succeeded;
1013 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1014 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1015 push @jobstep_done, $jobstepid;
1016 Log ($jobstepid, "success in $elapsed seconds");
1018 $Jobstep->{exitcode} = $childstatus;
1019 $Jobstep->{finishtime} = time;
1020 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1021 $Jobstep->{'arvados_task'}->save;
1022 process_stderr ($jobstepid, $task_success);
1023 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1025 close $reader{$jobstepid};
1026 delete $reader{$jobstepid};
1027 delete $slot[$proc{$pid}->{slot}]->{pid};
1028 push @freeslot, $proc{$pid}->{slot};
1031 if ($task_success) {
1033 my $newtask_list = [];
1034 my $newtask_results;
1036 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1038 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1040 'order' => 'qsequence',
1041 'offset' => scalar(@$newtask_list),
1043 push(@$newtask_list, @{$newtask_results->{items}});
1044 } while (@{$newtask_results->{items}});
1045 foreach my $arvados_task (@$newtask_list) {
1047 'level' => $arvados_task->{'sequence'},
1049 'arvados_task' => $arvados_task
1051 push @jobstep, $jobstep;
1052 push @jobstep_todo, $#jobstep;
1056 $progress_is_dirty = 1;
1060 sub check_refresh_wanted
1062 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1063 if (@stat && $stat[9] > $latest_refresh) {
1064 $latest_refresh = scalar time;
1065 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1066 for my $attr ('cancelled_at',
1067 'cancelled_by_user_uuid',
1068 'cancelled_by_client_uuid',
1070 $Job->{$attr} = $Job2->{$attr};
1072 if ($Job->{'state'} ne "Running") {
1073 if ($Job->{'state'} eq "Cancelled") {
1074 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1076 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1079 $main::please_freeze = 1;
1086 # return if the kill list was checked <4 seconds ago
1087 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1091 $squeue_kill_checked = time;
1093 # use killem() on procs whose killtime is reached
1096 if (exists $proc{$_}->{killtime}
1097 && $proc{$_}->{killtime} <= time)
1103 # return if the squeue was checked <60 seconds ago
1104 if (defined $squeue_checked && $squeue_checked > time - 60)
1108 $squeue_checked = time;
1112 # here is an opportunity to check for mysterious problems with local procs
1116 # get a list of steps still running
1117 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1119 if ($squeue[-1] ne "ok")
1125 # which of my jobsteps are running, according to squeue?
1129 if (/^(\d+)\.(\d+) (\S+)/)
1131 if ($1 eq $ENV{SLURM_JOBID})
1138 # which of my active child procs (>60s old) were not mentioned by squeue?
1139 foreach (keys %proc)
1141 if ($proc{$_}->{time} < time - 60
1142 && !exists $ok{$proc{$_}->{jobstepname}}
1143 && !exists $proc{$_}->{killtime})
1145 # kill this proc if it hasn't exited in 30 seconds
1146 $proc{$_}->{killtime} = time + 30;
1152 sub release_allocation
1156 Log (undef, "release job allocation");
1157 system "scancel $ENV{SLURM_JOBID}";
1165 foreach my $job (keys %reader)
1168 while (0 < sysread ($reader{$job}, $buf, 8192))
1170 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1171 $jobstep[$job]->{stderr} .= $buf;
1172 preprocess_stderr ($job);
1173 if (length ($jobstep[$job]->{stderr}) > 16384)
1175 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1184 sub preprocess_stderr
1188 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1190 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1191 Log ($job, "stderr $line");
1192 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1194 $main::please_freeze = 1;
1196 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1197 $jobstep[$job]->{node_fail} = 1;
1198 ban_node_by_slot($jobstep[$job]->{slotindex});
1207 my $task_success = shift;
1208 preprocess_stderr ($job);
1211 Log ($job, "stderr $_");
1212 } split ("\n", $jobstep[$job]->{stderr});
1218 my ($keep, $child_out, $output_block);
1220 my $cmd = "arv-get \Q$hash\E";
1221 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1225 my $bytes = sysread($keep, $buf, 1024 * 1024);
1226 if (!defined $bytes) {
1227 die "reading from arv-get: $!";
1228 } elsif ($bytes == 0) {
1229 # sysread returns 0 at the end of the pipe.
1232 # some bytes were read into buf.
1233 $output_block .= $buf;
1237 return $output_block;
1242 Log (undef, "collate");
1244 my ($child_out, $child_in);
1245 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1246 '--retries', put_retry_count());
1250 next if (!exists $_->{'arvados_task'}->{'output'} ||
1251 !$_->{'arvados_task'}->{'success'});
1252 my $output = $_->{'arvados_task'}->{output};
1253 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1255 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1256 print $child_in $output;
1258 elsif (@jobstep == 1)
1260 $joboutput = $output;
1263 elsif (defined (my $outblock = fetch_block ($output)))
1265 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1266 print $child_in $outblock;
1270 Log (undef, "XXX fetch_block($output) failed XXX");
1276 if (!defined $joboutput) {
1277 my $s = IO::Select->new($child_out);
1278 if ($s->can_read(120)) {
1279 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1281 # TODO: Ensure exit status == 0.
1283 Log (undef, "timed out reading from 'arv-put'");
1286 # TODO: kill $pid instead of waiting, now that we've decided to
1287 # ignore further output.
1298 my $sig = 2; # SIGINT first
1299 if (exists $proc{$_}->{"sent_$sig"} &&
1300 time - $proc{$_}->{"sent_$sig"} > 4)
1302 $sig = 15; # SIGTERM if SIGINT doesn't work
1304 if (exists $proc{$_}->{"sent_$sig"} &&
1305 time - $proc{$_}->{"sent_$sig"} > 4)
1307 $sig = 9; # SIGKILL if SIGTERM doesn't work
1309 if (!exists $proc{$_}->{"sent_$sig"})
1311 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1313 select (undef, undef, undef, 0.1);
1316 kill $sig, $_; # srun wants two SIGINT to really interrupt
1318 $proc{$_}->{"sent_$sig"} = time;
1319 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1329 vec($bits,fileno($_),1) = 1;
1335 # Send log output to Keep via arv-put.
1337 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1338 # $log_pipe_pid is the pid of the arv-put subprocess.
1340 # The only functions that should access these variables directly are:
1342 # log_writer_start($logfilename)
1343 # Starts an arv-put pipe, reading data on stdin and writing it to
1344 # a $logfilename file in an output collection.
1346 # log_writer_send($txt)
1347 # Writes $txt to the output log collection.
1349 # log_writer_finish()
1350 # Closes the arv-put pipe and returns the output that it produces.
1352 # log_writer_is_active()
1353 # Returns a true value if there is currently a live arv-put
1354 # process, false otherwise.
1356 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1358 sub log_writer_start($)
1360 my $logfilename = shift;
1361 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1362 'arv-put', '--portable-data-hash',
1364 '--filename', $logfilename,
1368 sub log_writer_send($)
1371 print $log_pipe_in $txt;
1374 sub log_writer_finish()
1376 return unless $log_pipe_pid;
1378 close($log_pipe_in);
1381 my $s = IO::Select->new($log_pipe_out);
1382 if ($s->can_read(120)) {
1383 sysread($log_pipe_out, $arv_put_output, 1024);
1384 chomp($arv_put_output);
1386 Log (undef, "timed out reading from 'arv-put'");
1389 waitpid($log_pipe_pid, 0);
1390 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1392 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1395 return $arv_put_output;
1398 sub log_writer_is_active() {
1399 return $log_pipe_pid;
1402 sub Log # ($jobstep_id, $logmessage)
1404 if ($_[1] =~ /\n/) {
1405 for my $line (split (/\n/, $_[1])) {
1410 my $fh = select STDERR; $|=1; select $fh;
1411 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1412 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1415 if (log_writer_is_active() || -t STDERR) {
1416 my @gmtime = gmtime;
1417 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1418 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1420 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1422 if (log_writer_is_active()) {
1423 log_writer_send($datetime . " " . $message);
1430 my ($package, $file, $line) = caller;
1431 my $message = "@_ at $file line $line\n";
1432 Log (undef, $message);
1433 freeze() if @jobstep_todo;
1434 collate_output() if @jobstep_todo;
1436 save_meta() if log_writer_is_active();
1443 if ($Job->{'state'} eq 'Cancelled') {
1444 $Job->update_attributes('finished_at' => scalar gmtime);
1446 $Job->update_attributes('state' => 'Failed');
1453 my $justcheckpoint = shift; # false if this will be the last meta saved
1454 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1456 my $loglocator = log_writer_finish();
1457 Log (undef, "log manifest is $loglocator");
1458 $Job->{'log'} = $loglocator;
1459 $Job->update_attributes('log', $loglocator);
1463 sub freeze_if_want_freeze
1465 if ($main::please_freeze)
1467 release_allocation();
1470 # kill some srun procs before freeze+stop
1471 map { $proc{$_} = {} } @_;
1474 killem (keys %proc);
1475 select (undef, undef, undef, 0.1);
1477 while (($died = waitpid (-1, WNOHANG)) > 0)
1479 delete $proc{$died};
1494 Log (undef, "Freeze not implemented");
1501 croak ("Thaw not implemented");
1517 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1524 my $srunargs = shift;
1525 my $execargs = shift;
1526 my $opts = shift || {};
1528 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1529 print STDERR (join (" ",
1530 map { / / ? "'$_'" : $_ }
1533 if $ENV{CRUNCH_DEBUG};
1535 if (defined $stdin) {
1536 my $child = open STDIN, "-|";
1537 defined $child or die "no fork: $!";
1539 print $stdin or die $!;
1540 close STDOUT or die $!;
1545 return system (@$args) if $opts->{fork};
1548 warn "ENV size is ".length(join(" ",%ENV));
1549 die "exec failed: $!: @$args";
1553 sub ban_node_by_slot {
1554 # Don't start any new jobsteps on this node for 60 seconds
1556 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1557 $slot[$slotid]->{node}->{hold_count}++;
1558 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1563 my ($lockfile, $error_message) = @_;
1564 open L, ">", $lockfile or croak("$lockfile: $!");
1565 if (!flock L, LOCK_EX|LOCK_NB) {
1566 croak("Can't lock $lockfile: $error_message\n");
1570 sub find_docker_image {
1571 # Given a Keep locator, check to see if it contains a Docker image.
1572 # If so, return its stream name and Docker hash.
1573 # If not, return undef for both values.
1574 my $locator = shift;
1575 my ($streamname, $filename);
1576 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1577 foreach my $line (split(/\n/, $image->{manifest_text})) {
1578 my @tokens = split(/\s+/, $line);
1580 $streamname = shift(@tokens);
1581 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1582 if (defined($filename)) {
1583 return (undef, undef); # More than one file in the Collection.
1585 $filename = (split(/:/, $filedata, 3))[2];
1590 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1591 return ($streamname, $1);
1593 return (undef, undef);
1597 sub put_retry_count {
1598 # Calculate a --retries argument for arv-put that will have it try
1599 # approximately as long as this Job has been running.
1600 my $stoptime = shift || time;
1601 my $starttime = $jobstep[0]->{starttime};
1602 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1604 while ($timediff >= 2) {
1608 return ($retries > 3) ? $retries : 3;
1612 # Given a $?, return a human-readable exit code string like "0" or
1613 # "1" or "0 with signal 1" or "1 with signal 11".
1614 my $exitcode = shift;
1615 my $s = $exitcode >> 8;
1616 if ($exitcode & 0x7f) {
1617 $s .= " with signal " . ($exitcode & 0x7f);
1619 if ($exitcode & 0x80) {
1620 $s .= " with core dump";
1628 # checkout-and-build
1631 use File::Path qw( make_path );
1633 my $destdir = $ENV{"CRUNCH_SRC"};
1634 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1635 my $repo = $ENV{"CRUNCH_SRC_URL"};
1636 my $task_work = $ENV{"TASK_WORK"};
1638 for my $dir ($destdir, $task_work) {
1641 -e $dir or die "Failed to create temporary directory ($dir): $!";
1645 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1647 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1650 die "Cannot exec `@ARGV`: $!";
1656 unlink "$destdir.commit";
1657 open STDOUT, ">", "$destdir.log";
1658 open STDERR, ">&STDOUT";
1661 my @git_archive_data = <DATA>;
1662 if (@git_archive_data) {
1663 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1664 print TARX @git_archive_data;
1666 die "'tar -C $destdir -xf -' exited $?: $!";
1671 chomp ($pwd = `pwd`);
1672 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1675 for my $src_path ("$destdir/arvados/sdk/python") {
1677 shell_or_die ("virtualenv", $install_dir);
1678 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1682 if (-e "$destdir/crunch_scripts/install") {
1683 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1684 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1686 shell_or_die ("./tests/autotests.sh", $install_dir);
1687 } elsif (-e "./install.sh") {
1688 shell_or_die ("./install.sh", $install_dir);
1692 unlink "$destdir.commit.new";
1693 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1694 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1701 die "Cannot exec `@ARGV`: $!";
1708 if ($ENV{"DEBUG"}) {
1709 print STDERR "@_\n";
1712 or die "@_ failed: $! exit 0x".sprintf("%x",$?);