2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
89 use Digest::MD5 qw(md5_hex);
95 use File::Path qw( make_path remove_tree );
97 use constant EX_TEMPFAIL => 75;
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102 if ($ENV{"USER"} ne "crunch" && $< != 0) {
103 # use a tmp dir unique for my uid
104 $ENV{"CRUNCH_TMP"} .= "-$<";
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
124 GetOptions('force-unlock' => \$force_unlock,
125 'git-dir=s' => \$git_dir,
126 'job=s' => \$jobspec,
127 'job-api-token=s' => \$job_api_token,
128 'no-clear-tmp' => \$no_clear_tmp,
129 'resume-stash=s' => \$resume_stash,
132 if (defined $job_api_token) {
133 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
142 $main::ENV{CRUNCH_DEBUG} = 1;
146 $main::ENV{CRUNCH_DEBUG} = 0;
151 my $arv = Arvados->new('apiVersion' => 'v1');
153 my $User = $arv->{'users'}->{'current'}->execute;
159 if ($jobspec =~ /^[-a-z\d]+$/)
161 # $jobspec is an Arvados UUID, not a JSON job specification
162 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
163 if (!$force_unlock) {
164 # Claim this job, and make sure nobody else does
166 # lock() sets is_locked_by_uuid and changes state to Running.
167 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
170 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
187 $Job->{'state'} = 'Running';
189 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191 $job_id = $Job->{'uuid'};
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 Log (undef, "check slurm allocation");
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209 push @sinfo, "$localcpus localhost";
211 if (exists $ENV{SLURM_NODELIST})
213 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 my ($ncpus, $slurm_nodelist) = split;
218 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
221 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
224 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
227 foreach (split (",", $ranges))
240 push @nodelist, map {
242 $n =~ s/\[[-,\d]+\]/$_/;
249 push @nodelist, $nodelist;
252 foreach my $nodename (@nodelist)
254 Log (undef, "node $nodename - $ncpus slots");
255 my $node = { name => $nodename,
259 foreach my $cpu (1..$ncpus)
261 push @slot, { node => $node,
265 push @node, @nodelist;
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
276 $Job->update_attributes(
277 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
314 if (defined $Job->{thawedfromkey})
316 thaw ($Job->{thawedfromkey});
320 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321 'job_uuid' => $Job->{'uuid'},
326 push @jobstep, { 'level' => 0,
328 'arvados_task' => $first_task,
330 push @jobstep_todo, 0;
336 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 $build_script = <DATA>;
345 my $nodelist = join(",", @node);
347 if (!defined $no_clear_tmp) {
348 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
349 Log (undef, "Clean work dirs");
351 my $cleanpid = fork();
354 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
355 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
360 last if $cleanpid == waitpid (-1, WNOHANG);
361 freeze_if_want_freeze ($cleanpid);
362 select (undef, undef, undef, 0.1);
364 Log (undef, "Cleanup command exited ".exit_status_s($?));
369 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
370 # If we're in user-land (i.e., not called from crunch-dispatch)
371 # script_version can be an absolute directory path, signifying we
372 # should work straight out of that directory instead of using a git
374 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
378 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
380 # Install requested code version
381 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
383 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
385 # If we're running under crunch-dispatch, it will have already
386 # pulled the appropriate source tree into its own repository, and
387 # given us that repo's path as $git_dir.
389 # If we're running a "local" job, we might have to fetch content
390 # from a remote repository.
392 # (Currently crunch-dispatch gives a local path with --git-dir, but
393 # we might as well accept URLs there too in case it changes its
395 my $repo = $git_dir || $Job->{'repository'};
397 # Repository can be remote or local. If remote, we'll need to fetch it
398 # to a local dir before doing `git log` et al.
401 if ($repo =~ m{://|^[^/]*:}) {
402 # $repo is a git url we can clone, like git:// or https:// or
403 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
404 # not recognized here because distinguishing that from a local
405 # path is too fragile. If you really need something strange here,
406 # use the ssh:// form.
407 $repo_location = 'remote';
408 } elsif ($repo =~ m{^\.*/}) {
409 # $repo is a local path to a git index. We'll also resolve ../foo
410 # to ../foo/.git if the latter is a directory. To help
411 # disambiguate local paths from named hosted repositories, this
412 # form must be given as ./ or ../ if it's a relative path.
413 if (-d "$repo/.git") {
414 $repo = "$repo/.git";
416 $repo_location = 'local';
418 # $repo is none of the above. It must be the name of a hosted
420 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
421 'filters' => [['name','=',$repo]]
423 my @repos_found = @{$arv_repo_list->{'items'}};
424 my $n_found = $arv_repo_list->{'items_available'};
426 Log(undef, "Repository '$repo' -> "
427 . join(", ", map { $_->{'uuid'} } @repos_found));
430 croak("Error: Found $n_found repositories with name '$repo'.");
432 $repo = $repos_found[0]->{'fetch_url'};
433 $repo_location = 'remote';
435 Log(undef, "Using $repo_location repository '$repo'");
436 $ENV{"CRUNCH_SRC_URL"} = $repo;
438 # Resolve given script_version (we'll call that $treeish here) to a
439 # commit sha1 ($commit).
440 my $treeish = $Job->{'script_version'};
442 if ($repo_location eq 'remote') {
443 # We minimize excess object-fetching by re-using the same bare
444 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
445 # just keep adding remotes to it as needed.
446 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
447 my $gitcmd = "git --git-dir=\Q$local_repo\E";
449 # Set up our local repo for caching remote objects, making
451 if (!-d $local_repo) {
452 make_path($local_repo) or croak("Error: could not create $local_repo");
454 # This works (exits 0 and doesn't delete fetched objects) even
455 # if $local_repo is already initialized:
456 `$gitcmd init --bare`;
458 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
461 # If $treeish looks like a hash (or abbrev hash) we look it up in
462 # our local cache first, since that's cheaper. (We don't want to
463 # do that with tags/branches though -- those change over time, so
464 # they should always be resolved by the remote repo.)
465 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
466 # Hide stderr because it's normal for this to fail:
467 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
469 # Careful not to resolve a branch named abcdeff to commit 1234567:
470 $sha1 =~ /^$treeish/ &&
471 $sha1 =~ /^([0-9a-f]{40})$/s) {
473 Log(undef, "Commit $commit already present in $local_repo");
477 if (!defined $commit) {
478 # If $treeish isn't just a hash or abbrev hash, or isn't here
479 # yet, we need to fetch the remote to resolve it correctly.
481 # First, remove all local heads. This prevents a name that does
482 # not exist on the remote from resolving to (or colliding with)
483 # a previously fetched branch or tag (possibly from a different
485 remove_tree("$local_repo/refs/heads", {keep_root => 1});
487 Log(undef, "Fetching objects from $repo to $local_repo");
488 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
490 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
494 # Now that the data is all here, we will use our local repo for
495 # the rest of our git activities.
499 my $gitcmd = "git --git-dir=\Q$repo\E";
500 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
501 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
502 croak("`$gitcmd rev-list` exited "
504 .", '$treeish' not found. Giving up.");
507 Log(undef, "Version $treeish is commit $commit");
509 if ($commit ne $Job->{'script_version'}) {
510 # Record the real commit id in the database, frozentokey, logs,
511 # etc. -- instead of an abbreviation or a branch name which can
512 # become ambiguous or point to a different commit in the future.
513 if (!$Job->update_attributes('script_version' => $commit)) {
514 croak("Error: failed to update job's script_version attribute");
518 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
519 $git_archive = `$gitcmd archive ''\Q$commit\E`;
521 croak("Error: $gitcmd archive exited ".exit_status_s($?));
525 if (!defined $git_archive) {
526 Log(undef, "Skip install phase (no git archive)");
528 Log(undef, "Warning: This probably means workers have no source tree!");
532 Log(undef, "Run install script on all workers");
534 my @srunargs = ("srun",
535 "--nodelist=$nodelist",
536 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
537 my @execargs = ("sh", "-c",
538 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
540 # Note: this section is almost certainly unnecessary if we're
541 # running tasks in docker containers.
542 my $installpid = fork();
543 if ($installpid == 0)
545 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
550 last if $installpid == waitpid (-1, WNOHANG);
551 freeze_if_want_freeze ($installpid);
552 select (undef, undef, undef, 0.1);
554 Log (undef, "Install script exited ".exit_status_s($?));
559 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
560 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
563 # If this job requires a Docker image, install that.
564 my $docker_bin = "/usr/bin/docker.io";
565 my ($docker_locator, $docker_stream, $docker_hash);
566 if ($docker_locator = $Job->{docker_image_locator}) {
567 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
570 croak("No Docker image hash found from locator $docker_locator");
572 $docker_stream =~ s/^\.//;
573 my $docker_install_script = qq{
574 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
575 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
578 my $docker_pid = fork();
579 if ($docker_pid == 0)
581 srun (["srun", "--nodelist=" . join(',', @node)],
582 ["/bin/sh", "-ec", $docker_install_script]);
587 last if $docker_pid == waitpid (-1, WNOHANG);
588 freeze_if_want_freeze ($docker_pid);
589 select (undef, undef, undef, 0.1);
593 croak("Installing Docker image from $docker_locator exited "
598 foreach (qw (script script_version script_parameters runtime_constraints))
602 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
604 foreach (split (/\n/, $Job->{knobs}))
606 Log (undef, "knob " . $_);
611 $main::success = undef;
617 my $thisround_succeeded = 0;
618 my $thisround_failed = 0;
619 my $thisround_failed_multiple = 0;
621 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
622 or $a <=> $b } @jobstep_todo;
623 my $level = $jobstep[$jobstep_todo[0]]->{level};
624 Log (undef, "start level $level");
629 my @freeslot = (0..$#slot);
632 my $progress_is_dirty = 1;
633 my $progress_stats_updated = 0;
635 update_progress_stats();
640 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
642 my $id = $jobstep_todo[$todo_ptr];
643 my $Jobstep = $jobstep[$id];
644 if ($Jobstep->{level} != $level)
649 pipe $reader{$id}, "writer" or croak ($!);
650 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
651 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
653 my $childslot = $freeslot[0];
654 my $childnode = $slot[$childslot]->{node};
655 my $childslotname = join (".",
656 $slot[$childslot]->{node}->{name},
657 $slot[$childslot]->{cpu});
658 my $childpid = fork();
661 $SIG{'INT'} = 'DEFAULT';
662 $SIG{'QUIT'} = 'DEFAULT';
663 $SIG{'TERM'} = 'DEFAULT';
665 foreach (values (%reader))
669 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
670 open(STDOUT,">&writer");
671 open(STDERR,">&writer");
676 delete $ENV{"GNUPGHOME"};
677 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
678 $ENV{"TASK_QSEQUENCE"} = $id;
679 $ENV{"TASK_SEQUENCE"} = $level;
680 $ENV{"JOB_SCRIPT"} = $Job->{script};
681 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
682 $param =~ tr/a-z/A-Z/;
683 $ENV{"JOB_PARAMETER_$param"} = $value;
685 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
686 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
687 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
688 $ENV{"HOME"} = $ENV{"TASK_WORK"};
689 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
690 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
691 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
692 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
698 "--nodelist=".$childnode->{name},
699 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
700 "--job-name=$job_id.$id.$$",
702 my $build_script_to_send = "";
704 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706 ."&& cd $ENV{CRUNCH_TMP} ";
709 $build_script_to_send = $build_script;
713 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
716 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
717 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
718 # Dynamically configure the container to use the host system as its
719 # DNS server. Get the host's global addresses from the ip command,
720 # and turn them into docker --dns options using gawk.
722 q{$(ip -o address show scope global |
723 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
724 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
725 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
726 $command .= "--env=\QHOME=/home/crunch\E ";
727 while (my ($env_key, $env_val) = each %ENV)
729 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
730 if ($env_key eq "TASK_WORK") {
731 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
733 elsif ($env_key eq "TASK_KEEPMOUNT") {
734 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
737 $command .= "--env=\Q$env_key=$env_val\E ";
741 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
742 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
743 $command .= "\Q$docker_hash\E ";
744 $command .= "stdbuf --output=0 --error=0 ";
745 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
748 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
749 $command .= "stdbuf --output=0 --error=0 ";
750 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
753 my @execargs = ('bash', '-c', $command);
754 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
755 # exec() failed, we assume nothing happened.
756 Log(undef, "srun() failed on build script");
760 if (!defined $childpid)
767 $proc{$childpid} = { jobstep => $id,
770 jobstepname => "$job_id.$id.$childpid",
772 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
773 $slot[$childslot]->{pid} = $childpid;
775 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
776 Log ($id, "child $childpid started on $childslotname");
777 $Jobstep->{starttime} = time;
778 $Jobstep->{node} = $childnode->{name};
779 $Jobstep->{slotindex} = $childslot;
780 delete $Jobstep->{stderr};
781 delete $Jobstep->{finishtime};
783 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
784 $Jobstep->{'arvados_task'}->save;
786 splice @jobstep_todo, $todo_ptr, 1;
789 $progress_is_dirty = 1;
793 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
795 last THISROUND if $main::please_freeze;
796 if ($main::please_info)
798 $main::please_info = 0;
802 update_progress_stats();
809 check_refresh_wanted();
811 update_progress_stats();
812 select (undef, undef, undef, 0.1);
814 elsif (time - $progress_stats_updated >= 30)
816 update_progress_stats();
818 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
819 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
821 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
822 .($thisround_failed+$thisround_succeeded)
823 .") -- giving up on this round";
824 Log (undef, $message);
828 # move slots from freeslot to holdslot (or back to freeslot) if necessary
829 for (my $i=$#freeslot; $i>=0; $i--) {
830 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
831 push @holdslot, (splice @freeslot, $i, 1);
834 for (my $i=$#holdslot; $i>=0; $i--) {
835 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
836 push @freeslot, (splice @holdslot, $i, 1);
840 # give up if no nodes are succeeding
841 if (!grep { $_->{node}->{losing_streak} == 0 &&
842 $_->{node}->{hold_count} < 4 } @slot) {
843 my $message = "Every node has failed -- giving up on this round";
844 Log (undef, $message);
851 push @freeslot, splice @holdslot;
852 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
855 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
858 if ($main::please_continue) {
859 $main::please_continue = 0;
862 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
866 check_refresh_wanted();
868 update_progress_stats();
869 select (undef, undef, undef, 0.1);
870 killem (keys %proc) if $main::please_freeze;
874 update_progress_stats();
875 freeze_if_want_freeze();
878 if (!defined $main::success)
881 $thisround_succeeded == 0 &&
882 ($thisround_failed == 0 || $thisround_failed > 4))
884 my $message = "stop because $thisround_failed tasks failed and none succeeded";
885 Log (undef, $message);
894 goto ONELEVEL if !defined $main::success;
897 release_allocation();
899 my $collated_output = &collate_output();
901 if (!$collated_output) {
902 Log(undef, "output undef");
906 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
907 or die "failed to get collated manifest: $!";
908 my $orig_manifest_text = '';
909 while (my $manifest_line = <$orig_manifest>) {
910 $orig_manifest_text .= $manifest_line;
912 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
913 'manifest_text' => $orig_manifest_text,
915 Log(undef, "output uuid " . $output->{uuid});
916 Log(undef, "output hash " . $output->{portable_data_hash});
917 $Job->update_attributes('output' => $output->{portable_data_hash});
920 Log (undef, "Failed to register output manifest: $@");
924 Log (undef, "finish");
929 if ($collated_output && $main::success) {
930 $final_state = 'Complete';
932 $final_state = 'Failed';
934 $Job->update_attributes('state' => $final_state);
936 exit (($final_state eq 'Complete') ? 0 : 1);
940 sub update_progress_stats
942 $progress_stats_updated = time;
943 return if !$progress_is_dirty;
944 my ($todo, $done, $running) = (scalar @jobstep_todo,
945 scalar @jobstep_done,
946 scalar @slot - scalar @freeslot - scalar @holdslot);
947 $Job->{'tasks_summary'} ||= {};
948 $Job->{'tasks_summary'}->{'todo'} = $todo;
949 $Job->{'tasks_summary'}->{'done'} = $done;
950 $Job->{'tasks_summary'}->{'running'} = $running;
951 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
952 Log (undef, "status: $done done, $running running, $todo todo");
953 $progress_is_dirty = 0;
960 my $pid = waitpid (-1, WNOHANG);
961 return 0 if $pid <= 0;
963 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
965 . $slot[$proc{$pid}->{slot}]->{cpu});
966 my $jobstepid = $proc{$pid}->{jobstep};
967 my $elapsed = time - $proc{$pid}->{time};
968 my $Jobstep = $jobstep[$jobstepid];
970 my $childstatus = $?;
971 my $exitvalue = $childstatus >> 8;
972 my $exitinfo = "exit ".exit_status_s($childstatus);
973 $Jobstep->{'arvados_task'}->reload;
974 my $task_success = $Jobstep->{'arvados_task'}->{success};
976 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
978 if (!defined $task_success) {
979 # task did not indicate one way or the other --> fail
980 $Jobstep->{'arvados_task'}->{success} = 0;
981 $Jobstep->{'arvados_task'}->save;
988 $temporary_fail ||= $Jobstep->{node_fail};
989 $temporary_fail ||= ($exitvalue == 111);
992 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
994 # Check for signs of a failed or misconfigured node
995 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
996 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
997 # Don't count this against jobstep failure thresholds if this
998 # node is already suspected faulty and srun exited quickly
999 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1001 Log ($jobstepid, "blaming failure on suspect node " .
1002 $slot[$proc{$pid}->{slot}]->{node}->{name});
1003 $temporary_fail ||= 1;
1005 ban_node_by_slot($proc{$pid}->{slot});
1008 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1009 ++$Jobstep->{'failures'},
1010 $temporary_fail ? 'temporary ' : 'permanent',
1013 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1014 # Give up on this task, and the whole job
1016 $main::please_freeze = 1;
1018 # Put this task back on the todo queue
1019 push @jobstep_todo, $jobstepid;
1020 $Job->{'tasks_summary'}->{'failed'}++;
1024 ++$thisround_succeeded;
1025 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1026 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1027 push @jobstep_done, $jobstepid;
1028 Log ($jobstepid, "success in $elapsed seconds");
1030 $Jobstep->{exitcode} = $childstatus;
1031 $Jobstep->{finishtime} = time;
1032 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1033 $Jobstep->{'arvados_task'}->save;
1034 process_stderr ($jobstepid, $task_success);
1035 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1037 close $reader{$jobstepid};
1038 delete $reader{$jobstepid};
1039 delete $slot[$proc{$pid}->{slot}]->{pid};
1040 push @freeslot, $proc{$pid}->{slot};
1043 if ($task_success) {
1045 my $newtask_list = [];
1046 my $newtask_results;
1048 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1050 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1052 'order' => 'qsequence',
1053 'offset' => scalar(@$newtask_list),
1055 push(@$newtask_list, @{$newtask_results->{items}});
1056 } while (@{$newtask_results->{items}});
1057 foreach my $arvados_task (@$newtask_list) {
1059 'level' => $arvados_task->{'sequence'},
1061 'arvados_task' => $arvados_task
1063 push @jobstep, $jobstep;
1064 push @jobstep_todo, $#jobstep;
1068 $progress_is_dirty = 1;
1072 sub check_refresh_wanted
1074 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1075 if (@stat && $stat[9] > $latest_refresh) {
1076 $latest_refresh = scalar time;
1077 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1078 for my $attr ('cancelled_at',
1079 'cancelled_by_user_uuid',
1080 'cancelled_by_client_uuid',
1082 $Job->{$attr} = $Job2->{$attr};
1084 if ($Job->{'state'} ne "Running") {
1085 if ($Job->{'state'} eq "Cancelled") {
1086 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1088 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1091 $main::please_freeze = 1;
1098 # return if the kill list was checked <4 seconds ago
1099 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1103 $squeue_kill_checked = time;
1105 # use killem() on procs whose killtime is reached
1108 if (exists $proc{$_}->{killtime}
1109 && $proc{$_}->{killtime} <= time)
1115 # return if the squeue was checked <60 seconds ago
1116 if (defined $squeue_checked && $squeue_checked > time - 60)
1120 $squeue_checked = time;
1124 # here is an opportunity to check for mysterious problems with local procs
1128 # get a list of steps still running
1129 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1131 if ($squeue[-1] ne "ok")
1137 # which of my jobsteps are running, according to squeue?
1141 if (/^(\d+)\.(\d+) (\S+)/)
1143 if ($1 eq $ENV{SLURM_JOBID})
1150 # which of my active child procs (>60s old) were not mentioned by squeue?
1151 foreach (keys %proc)
1153 if ($proc{$_}->{time} < time - 60
1154 && !exists $ok{$proc{$_}->{jobstepname}}
1155 && !exists $proc{$_}->{killtime})
1157 # kill this proc if it hasn't exited in 30 seconds
1158 $proc{$_}->{killtime} = time + 30;
1164 sub release_allocation
1168 Log (undef, "release job allocation");
1169 system "scancel $ENV{SLURM_JOBID}";
1177 foreach my $job (keys %reader)
1180 while (0 < sysread ($reader{$job}, $buf, 8192))
1182 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1183 $jobstep[$job]->{stderr} .= $buf;
1184 preprocess_stderr ($job);
1185 if (length ($jobstep[$job]->{stderr}) > 16384)
1187 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1196 sub preprocess_stderr
1200 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1202 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1203 Log ($job, "stderr $line");
1204 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1206 $main::please_freeze = 1;
1208 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1209 $jobstep[$job]->{node_fail} = 1;
1210 ban_node_by_slot($jobstep[$job]->{slotindex});
1219 my $task_success = shift;
1220 preprocess_stderr ($job);
1223 Log ($job, "stderr $_");
1224 } split ("\n", $jobstep[$job]->{stderr});
1230 my ($keep, $child_out, $output_block);
1232 my $cmd = "arv-get \Q$hash\E";
1233 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1237 my $bytes = sysread($keep, $buf, 1024 * 1024);
1238 if (!defined $bytes) {
1239 die "reading from arv-get: $!";
1240 } elsif ($bytes == 0) {
1241 # sysread returns 0 at the end of the pipe.
1244 # some bytes were read into buf.
1245 $output_block .= $buf;
1249 return $output_block;
1254 Log (undef, "collate");
1256 my ($child_out, $child_in);
1257 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1258 '--retries', put_retry_count());
1262 next if (!exists $_->{'arvados_task'}->{'output'} ||
1263 !$_->{'arvados_task'}->{'success'});
1264 my $output = $_->{'arvados_task'}->{output};
1265 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1267 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1268 print $child_in $output;
1270 elsif (@jobstep == 1)
1272 $joboutput = $output;
1275 elsif (defined (my $outblock = fetch_block ($output)))
1277 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1278 print $child_in $outblock;
1282 Log (undef, "XXX fetch_block($output) failed XXX");
1288 if (!defined $joboutput) {
1289 my $s = IO::Select->new($child_out);
1290 if ($s->can_read(120)) {
1291 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1293 # TODO: Ensure exit status == 0.
1295 Log (undef, "timed out reading from 'arv-put'");
1298 # TODO: kill $pid instead of waiting, now that we've decided to
1299 # ignore further output.
1310 my $sig = 2; # SIGINT first
1311 if (exists $proc{$_}->{"sent_$sig"} &&
1312 time - $proc{$_}->{"sent_$sig"} > 4)
1314 $sig = 15; # SIGTERM if SIGINT doesn't work
1316 if (exists $proc{$_}->{"sent_$sig"} &&
1317 time - $proc{$_}->{"sent_$sig"} > 4)
1319 $sig = 9; # SIGKILL if SIGTERM doesn't work
1321 if (!exists $proc{$_}->{"sent_$sig"})
1323 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1325 select (undef, undef, undef, 0.1);
1328 kill $sig, $_; # srun wants two SIGINT to really interrupt
1330 $proc{$_}->{"sent_$sig"} = time;
1331 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1341 vec($bits,fileno($_),1) = 1;
1347 # Send log output to Keep via arv-put.
1349 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1350 # $log_pipe_pid is the pid of the arv-put subprocess.
1352 # The only functions that should access these variables directly are:
1354 # log_writer_start($logfilename)
1355 # Starts an arv-put pipe, reading data on stdin and writing it to
1356 # a $logfilename file in an output collection.
1358 # log_writer_send($txt)
1359 # Writes $txt to the output log collection.
1361 # log_writer_finish()
1362 # Closes the arv-put pipe and returns the output that it produces.
1364 # log_writer_is_active()
1365 # Returns a true value if there is currently a live arv-put
1366 # process, false otherwise.
1368 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1370 sub log_writer_start($)
1372 my $logfilename = shift;
1373 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1374 'arv-put', '--portable-data-hash',
1376 '--filename', $logfilename,
1380 sub log_writer_send($)
1383 print $log_pipe_in $txt;
1386 sub log_writer_finish()
1388 return unless $log_pipe_pid;
1390 close($log_pipe_in);
1393 my $s = IO::Select->new($log_pipe_out);
1394 if ($s->can_read(120)) {
1395 sysread($log_pipe_out, $arv_put_output, 1024);
1396 chomp($arv_put_output);
1398 Log (undef, "timed out reading from 'arv-put'");
1401 waitpid($log_pipe_pid, 0);
1402 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1404 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1407 return $arv_put_output;
1410 sub log_writer_is_active() {
1411 return $log_pipe_pid;
1414 sub Log # ($jobstep_id, $logmessage)
1416 if ($_[1] =~ /\n/) {
1417 for my $line (split (/\n/, $_[1])) {
1422 my $fh = select STDERR; $|=1; select $fh;
1423 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1424 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1427 if (log_writer_is_active() || -t STDERR) {
1428 my @gmtime = gmtime;
1429 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1430 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1432 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1434 if (log_writer_is_active()) {
1435 log_writer_send($datetime . " " . $message);
1442 my ($package, $file, $line) = caller;
1443 my $message = "@_ at $file line $line\n";
1444 Log (undef, $message);
1445 freeze() if @jobstep_todo;
1446 collate_output() if @jobstep_todo;
1456 if ($Job->{'state'} eq 'Cancelled') {
1457 $Job->update_attributes('finished_at' => scalar gmtime);
1459 $Job->update_attributes('state' => 'Failed');
1466 my $justcheckpoint = shift; # false if this will be the last meta saved
1467 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1468 return unless log_writer_is_active();
1470 my $loglocator = log_writer_finish();
1471 Log (undef, "log manifest is $loglocator");
1472 $Job->{'log'} = $loglocator;
1473 $Job->update_attributes('log', $loglocator);
1477 sub freeze_if_want_freeze
1479 if ($main::please_freeze)
1481 release_allocation();
1484 # kill some srun procs before freeze+stop
1485 map { $proc{$_} = {} } @_;
1488 killem (keys %proc);
1489 select (undef, undef, undef, 0.1);
1491 while (($died = waitpid (-1, WNOHANG)) > 0)
1493 delete $proc{$died};
1508 Log (undef, "Freeze not implemented");
1515 croak ("Thaw not implemented");
1531 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1538 my $srunargs = shift;
1539 my $execargs = shift;
1540 my $opts = shift || {};
1542 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1543 print STDERR (join (" ",
1544 map { / / ? "'$_'" : $_ }
1547 if $ENV{CRUNCH_DEBUG};
1549 if (defined $stdin) {
1550 my $child = open STDIN, "-|";
1551 defined $child or die "no fork: $!";
1553 print $stdin or die $!;
1554 close STDOUT or die $!;
1559 return system (@$args) if $opts->{fork};
1562 warn "ENV size is ".length(join(" ",%ENV));
1563 die "exec failed: $!: @$args";
1567 sub ban_node_by_slot {
1568 # Don't start any new jobsteps on this node for 60 seconds
1570 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1571 $slot[$slotid]->{node}->{hold_count}++;
1572 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1577 my ($lockfile, $error_message) = @_;
1578 open L, ">", $lockfile or croak("$lockfile: $!");
1579 if (!flock L, LOCK_EX|LOCK_NB) {
1580 croak("Can't lock $lockfile: $error_message\n");
1584 sub find_docker_image {
1585 # Given a Keep locator, check to see if it contains a Docker image.
1586 # If so, return its stream name and Docker hash.
1587 # If not, return undef for both values.
1588 my $locator = shift;
1589 my ($streamname, $filename);
1590 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1591 foreach my $line (split(/\n/, $image->{manifest_text})) {
1592 my @tokens = split(/\s+/, $line);
1594 $streamname = shift(@tokens);
1595 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1596 if (defined($filename)) {
1597 return (undef, undef); # More than one file in the Collection.
1599 $filename = (split(/:/, $filedata, 3))[2];
1604 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1605 return ($streamname, $1);
1607 return (undef, undef);
1611 sub put_retry_count {
1612 # Calculate a --retries argument for arv-put that will have it try
1613 # approximately as long as this Job has been running.
1614 my $stoptime = shift || time;
1615 my $starttime = $jobstep[0]->{starttime};
1616 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1618 while ($timediff >= 2) {
1622 return ($retries > 3) ? $retries : 3;
1626 # Given a $?, return a human-readable exit code string like "0" or
1627 # "1" or "0 with signal 1" or "1 with signal 11".
1628 my $exitcode = shift;
1629 my $s = $exitcode >> 8;
1630 if ($exitcode & 0x7f) {
1631 $s .= " with signal " . ($exitcode & 0x7f);
1633 if ($exitcode & 0x80) {
1634 $s .= " with core dump";
1642 # checkout-and-build
1645 use File::Path qw( make_path );
1647 my $destdir = $ENV{"CRUNCH_SRC"};
1648 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1649 my $repo = $ENV{"CRUNCH_SRC_URL"};
1650 my $task_work = $ENV{"TASK_WORK"};
1652 for my $dir ($destdir, $task_work) {
1655 -e $dir or die "Failed to create temporary directory ($dir): $!";
1659 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1661 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1664 die "Cannot exec `@ARGV`: $!";
1670 unlink "$destdir.commit";
1671 open STDOUT, ">", "$destdir.log";
1672 open STDERR, ">&STDOUT";
1675 my @git_archive_data = <DATA>;
1676 if (@git_archive_data) {
1677 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1678 print TARX @git_archive_data;
1680 die "'tar -C $destdir -xf -' exited $?: $!";
1685 chomp ($pwd = `pwd`);
1686 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1689 for my $src_path ("$destdir/arvados/sdk/python") {
1691 shell_or_die ("virtualenv", $install_dir);
1692 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1696 if (-e "$destdir/crunch_scripts/install") {
1697 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1698 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1700 shell_or_die ("./tests/autotests.sh", $install_dir);
1701 } elsif (-e "./install.sh") {
1702 shell_or_die ("./install.sh", $install_dir);
1706 unlink "$destdir.commit.new";
1707 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1708 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1715 die "Cannot exec `@ARGV`: $!";
1722 if ($ENV{"DEBUG"}) {
1723 print STDERR "@_\n";
1726 or die "@_ failed: $! exit 0x".sprintf("%x",$?);