2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105 if ($ENV{"USER"} ne "crunch" && $< != 0) {
106 # use a tmp dir unique for my uid
107 $ENV{"CRUNCH_TMP"} .= "-$<";
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
128 GetOptions('force-unlock' => \$force_unlock,
129 'git-dir=s' => \$git_dir,
130 'job=s' => \$jobspec,
131 'job-api-token=s' => \$job_api_token,
132 'no-clear-tmp' => \$no_clear_tmp,
133 'resume-stash=s' => \$resume_stash,
136 if (defined $job_api_token) {
137 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
140 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
146 $main::ENV{CRUNCH_DEBUG} = 1;
150 $main::ENV{CRUNCH_DEBUG} = 0;
155 my $arv = Arvados->new('apiVersion' => 'v1');
163 my $User = api_call("users/current");
165 if ($jobspec =~ /^[-a-z\d]+$/)
167 # $jobspec is an Arvados UUID, not a JSON job specification
168 $Job = api_call("jobs/get", uuid => $jobspec);
169 if (!$force_unlock) {
170 # Claim this job, and make sure nobody else does
171 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
173 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
180 $Job = JSON::decode_json($jobspec);
184 map { croak ("No $_ specified") unless $Job->{$_} }
185 qw(script script_version script_parameters);
188 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
189 $Job->{'started_at'} = gmtime;
190 $Job->{'state'} = 'Running';
192 $Job = api_call("jobs/create", job => $Job);
194 $job_id = $Job->{'uuid'};
196 my $keep_logfile = $job_id . '.log.txt';
197 log_writer_start($keep_logfile);
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
203 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
205 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
206 chomp($gem_versions);
207 chop($gem_versions); # Closing parentheses
212 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
214 Log (undef, "check slurm allocation");
217 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
221 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
222 push @sinfo, "$localcpus localhost";
224 if (exists $ENV{SLURM_NODELIST})
226 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
230 my ($ncpus, $slurm_nodelist) = split;
231 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
234 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
237 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
240 foreach (split (",", $ranges))
253 push @nodelist, map {
255 $n =~ s/\[[-,\d]+\]/$_/;
262 push @nodelist, $nodelist;
265 foreach my $nodename (@nodelist)
267 Log (undef, "node $nodename - $ncpus slots");
268 my $node = { name => $nodename,
272 foreach my $cpu (1..$ncpus)
274 push @slot, { node => $node,
278 push @node, @nodelist;
283 # Ensure that we get one jobstep running on each allocated node before
284 # we start overloading nodes with concurrent steps
286 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
289 $Job->update_attributes(
290 'tasks_summary' => { 'failed' => 0,
295 Log (undef, "start");
296 $SIG{'INT'} = sub { $main::please_freeze = 1; };
297 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
298 $SIG{'TERM'} = \&croak;
299 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
300 $SIG{'ALRM'} = sub { $main::please_info = 1; };
301 $SIG{'CONT'} = sub { $main::please_continue = 1; };
302 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
304 $main::please_freeze = 0;
305 $main::please_info = 0;
306 $main::please_continue = 0;
307 $main::please_refresh = 0;
308 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
310 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
311 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
312 $ENV{"JOB_UUID"} = $job_id;
315 my @jobstep_todo = ();
316 my @jobstep_done = ();
317 my @jobstep_tomerge = ();
318 my $jobstep_tomerge_level = 0;
320 my $squeue_kill_checked;
321 my $latest_refresh = scalar time;
325 if (defined $Job->{thawedfromkey})
327 thaw ($Job->{thawedfromkey});
331 my $first_task = api_call("job_tasks/create", job_task => {
332 'job_uuid' => $Job->{'uuid'},
337 push @jobstep, { 'level' => 0,
339 'arvados_task' => $first_task,
341 push @jobstep_todo, 0;
347 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
350 my $build_script = handle_readall(\*DATA);
351 my $nodelist = join(",", @node);
352 my $git_tar_count = 0;
354 if (!defined $no_clear_tmp) {
355 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356 Log (undef, "Clean work dirs");
358 my $cleanpid = fork();
361 # Find FUSE mounts that look like Keep mounts (the mount path has the
362 # word "keep") and unmount them. Then clean up work directories.
363 # TODO: When #5036 is done and widely deployed, we can get rid of the
364 # regular expression and just unmount everything with type fuse.keep.
365 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
366 ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
371 last if $cleanpid == waitpid (-1, WNOHANG);
372 freeze_if_want_freeze ($cleanpid);
373 select (undef, undef, undef, 0.1);
375 Log (undef, "Cleanup command exited ".exit_status_s($?));
378 # If this job requires a Docker image, install that.
379 my $docker_bin = "/usr/bin/docker.io";
380 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
381 if ($docker_locator = $Job->{docker_image_locator}) {
382 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
385 croak("No Docker image hash found from locator $docker_locator");
387 $docker_stream =~ s/^\.//;
388 my $docker_install_script = qq{
389 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
390 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
393 my $docker_pid = fork();
394 if ($docker_pid == 0)
396 srun (["srun", "--nodelist=" . join(',', @node)],
397 ["/bin/sh", "-ec", $docker_install_script]);
402 last if $docker_pid == waitpid (-1, WNOHANG);
403 freeze_if_want_freeze ($docker_pid);
404 select (undef, undef, undef, 0.1);
408 croak("Installing Docker image from $docker_locator exited "
412 # Determine whether this version of Docker supports memory+swap limits.
413 srun(["srun", "--nodelist=" . $node[0]],
414 ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
416 $docker_limitmem = ($? == 0);
418 if ($Job->{arvados_sdk_version}) {
419 # The job also specifies an Arvados SDK version. Add the SDKs to the
420 # tar file for the build script to install.
421 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
422 $Job->{arvados_sdk_version}));
423 add_git_archive("git", "--git-dir=$git_dir", "archive",
424 "--prefix=.arvados.sdk/",
425 $Job->{arvados_sdk_version}, "sdk");
429 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
430 # If script_version looks like an absolute path, *and* the --git-dir
431 # argument was not given -- which implies we were not invoked by
432 # crunch-dispatch -- we will use the given path as a working
433 # directory instead of resolving script_version to a git commit (or
434 # doing anything else with git).
435 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
436 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
439 # Resolve the given script_version to a git commit sha1. Also, if
440 # the repository is remote, clone it into our local filesystem: this
441 # ensures "git archive" will work, and is necessary to reliably
442 # resolve a symbolic script_version like "master^".
443 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
445 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
447 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
449 # If we're running under crunch-dispatch, it will have already
450 # pulled the appropriate source tree into its own repository, and
451 # given us that repo's path as $git_dir.
453 # If we're running a "local" job, we might have to fetch content
454 # from a remote repository.
456 # (Currently crunch-dispatch gives a local path with --git-dir, but
457 # we might as well accept URLs there too in case it changes its
459 my $repo = $git_dir || $Job->{'repository'};
461 # Repository can be remote or local. If remote, we'll need to fetch it
462 # to a local dir before doing `git log` et al.
465 if ($repo =~ m{://|^[^/]*:}) {
466 # $repo is a git url we can clone, like git:// or https:// or
467 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
468 # not recognized here because distinguishing that from a local
469 # path is too fragile. If you really need something strange here,
470 # use the ssh:// form.
471 $repo_location = 'remote';
472 } elsif ($repo =~ m{^\.*/}) {
473 # $repo is a local path to a git index. We'll also resolve ../foo
474 # to ../foo/.git if the latter is a directory. To help
475 # disambiguate local paths from named hosted repositories, this
476 # form must be given as ./ or ../ if it's a relative path.
477 if (-d "$repo/.git") {
478 $repo = "$repo/.git";
480 $repo_location = 'local';
482 # $repo is none of the above. It must be the name of a hosted
484 my $arv_repo_list = api_call("repositories/list",
485 'filters' => [['name','=',$repo]]);
486 my @repos_found = @{$arv_repo_list->{'items'}};
487 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
489 Log(undef, "Repository '$repo' -> "
490 . join(", ", map { $_->{'uuid'} } @repos_found));
493 croak("Error: Found $n_found repositories with name '$repo'.");
495 $repo = $repos_found[0]->{'fetch_url'};
496 $repo_location = 'remote';
498 Log(undef, "Using $repo_location repository '$repo'");
499 $ENV{"CRUNCH_SRC_URL"} = $repo;
501 # Resolve given script_version (we'll call that $treeish here) to a
502 # commit sha1 ($commit).
503 my $treeish = $Job->{'script_version'};
505 if ($repo_location eq 'remote') {
506 # We minimize excess object-fetching by re-using the same bare
507 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
508 # just keep adding remotes to it as needed.
509 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
510 my $gitcmd = "git --git-dir=\Q$local_repo\E";
512 # Set up our local repo for caching remote objects, making
514 if (!-d $local_repo) {
515 make_path($local_repo) or croak("Error: could not create $local_repo");
517 # This works (exits 0 and doesn't delete fetched objects) even
518 # if $local_repo is already initialized:
519 `$gitcmd init --bare`;
521 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
524 # If $treeish looks like a hash (or abbrev hash) we look it up in
525 # our local cache first, since that's cheaper. (We don't want to
526 # do that with tags/branches though -- those change over time, so
527 # they should always be resolved by the remote repo.)
528 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
529 # Hide stderr because it's normal for this to fail:
530 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
532 # Careful not to resolve a branch named abcdeff to commit 1234567:
533 $sha1 =~ /^$treeish/ &&
534 $sha1 =~ /^([0-9a-f]{40})$/s) {
536 Log(undef, "Commit $commit already present in $local_repo");
540 if (!defined $commit) {
541 # If $treeish isn't just a hash or abbrev hash, or isn't here
542 # yet, we need to fetch the remote to resolve it correctly.
544 # First, remove all local heads. This prevents a name that does
545 # not exist on the remote from resolving to (or colliding with)
546 # a previously fetched branch or tag (possibly from a different
548 remove_tree("$local_repo/refs/heads", {keep_root => 1});
550 Log(undef, "Fetching objects from $repo to $local_repo");
551 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
553 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
557 # Now that the data is all here, we will use our local repo for
558 # the rest of our git activities.
562 my $gitcmd = "git --git-dir=\Q$repo\E";
563 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
564 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
565 croak("`$gitcmd rev-list` exited "
567 .", '$treeish' not found. Giving up.");
570 Log(undef, "Version $treeish is commit $commit");
572 if ($commit ne $Job->{'script_version'}) {
573 # Record the real commit id in the database, frozentokey, logs,
574 # etc. -- instead of an abbreviation or a branch name which can
575 # become ambiguous or point to a different commit in the future.
576 if (!$Job->update_attributes('script_version' => $commit)) {
577 croak("Error: failed to update job's script_version attribute");
581 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
582 add_git_archive("$gitcmd archive ''\Q$commit\E");
585 my $git_archive = combined_git_archive();
586 if (!defined $git_archive) {
587 Log(undef, "Skip install phase (no git archive)");
589 Log(undef, "Warning: This probably means workers have no source tree!");
594 my $install_script_tries_left = 3;
595 for (my $attempts = 0; $attempts < 3; $attempts++) {
596 Log(undef, "Run install script on all workers");
598 my @srunargs = ("srun",
599 "--nodelist=$nodelist",
600 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
601 my @execargs = ("sh", "-c",
602 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
604 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
605 my ($install_stderr_r, $install_stderr_w);
606 pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
607 set_nonblocking($install_stderr_r);
608 my $installpid = fork();
609 if ($installpid == 0)
611 close($install_stderr_r);
612 fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
613 open(STDOUT, ">&", $install_stderr_w);
614 open(STDERR, ">&", $install_stderr_w);
615 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
618 close($install_stderr_w);
619 # Tell freeze_if_want_freeze how to kill the child, otherwise the
620 # "waitpid(installpid)" loop won't get interrupted by a freeze:
621 $proc{$installpid} = {};
623 # Track whether anything appears on stderr other than slurm errors
624 # ("srun: ...") and the "starting: ..." message printed by the
625 # srun subroutine itself:
626 my $stderr_anything_from_script = 0;
627 my $match_our_own_errors = '^(srun: error: |starting: \[)';
628 while ($installpid != waitpid(-1, WNOHANG)) {
629 freeze_if_want_freeze ($installpid);
630 # Wait up to 0.1 seconds for something to appear on stderr, then
631 # do a non-blocking read.
632 my $bits = fhbits($install_stderr_r);
633 select ($bits, undef, $bits, 0.1);
634 if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
636 while ($stderr_buf =~ /^(.*?)\n/) {
638 substr $stderr_buf, 0, 1+length($line), "";
639 Log(undef, "stderr $line");
640 if ($line !~ /$match_our_own_errors/) {
641 $stderr_anything_from_script = 1;
646 delete $proc{$installpid};
647 $install_exited = $?;
648 close($install_stderr_r);
649 if (length($stderr_buf) > 0) {
650 if ($stderr_buf !~ /$match_our_own_errors/) {
651 $stderr_anything_from_script = 1;
653 Log(undef, "stderr $stderr_buf")
656 Log (undef, "Install script exited ".exit_status_s($install_exited));
657 last if $install_exited == 0 || $main::please_freeze;
658 # If the install script fails but doesn't print an error message,
659 # the next thing anyone is likely to do is just run it again in
660 # case it was a transient problem like "slurm communication fails
661 # because the network isn't reliable enough". So we'll just do
662 # that ourselves (up to 3 attempts in total). OTOH, if there is an
663 # error message, the problem is more likely to have a real fix and
664 # we should fail the job so the fixing process can start, instead
665 # of doing 2 more attempts.
666 last if $stderr_anything_from_script;
669 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
670 unlink($tar_filename);
673 if ($install_exited != 0) {
678 foreach (qw (script script_version script_parameters runtime_constraints))
682 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
684 foreach (split (/\n/, $Job->{knobs}))
686 Log (undef, "knob " . $_);
691 $main::success = undef;
697 my $thisround_succeeded = 0;
698 my $thisround_failed = 0;
699 my $thisround_failed_multiple = 0;
701 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
702 or $a <=> $b } @jobstep_todo;
703 my $level = $jobstep[$jobstep_todo[0]]->{level};
705 my $initial_tasks_this_level = 0;
706 foreach my $id (@jobstep_todo) {
707 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
710 # If the number of tasks scheduled at this level #T is smaller than the number
711 # of slots available #S, only use the first #T slots, or the first slot on
712 # each node, whichever number is greater.
714 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
715 # based on these numbers. Using fewer slots makes more resources available
716 # to each individual task, which should normally be a better strategy when
717 # there are fewer of them running with less parallelism.
719 # Note that this calculation is not redone if the initial tasks at
720 # this level queue more tasks at the same level. This may harm
721 # overall task throughput for that level.
723 if ($initial_tasks_this_level < @node) {
724 @freeslot = (0..$#node);
725 } elsif ($initial_tasks_this_level < @slot) {
726 @freeslot = (0..$initial_tasks_this_level - 1);
728 @freeslot = (0..$#slot);
730 my $round_num_freeslots = scalar(@freeslot);
732 my %round_max_slots = ();
733 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
734 my $this_slot = $slot[$freeslot[$ii]];
735 my $node_name = $this_slot->{node}->{name};
736 $round_max_slots{$node_name} ||= $this_slot->{cpu};
737 last if (scalar(keys(%round_max_slots)) >= @node);
740 Log(undef, "start level $level with $round_num_freeslots slots");
743 my $progress_is_dirty = 1;
744 my $progress_stats_updated = 0;
746 update_progress_stats();
750 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
752 my $id = $jobstep_todo[$todo_ptr];
753 my $Jobstep = $jobstep[$id];
754 if ($Jobstep->{level} != $level)
759 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
760 set_nonblocking($reader{$id});
762 my $childslot = $freeslot[0];
763 my $childnode = $slot[$childslot]->{node};
764 my $childslotname = join (".",
765 $slot[$childslot]->{node}->{name},
766 $slot[$childslot]->{cpu});
768 my $childpid = fork();
771 $SIG{'INT'} = 'DEFAULT';
772 $SIG{'QUIT'} = 'DEFAULT';
773 $SIG{'TERM'} = 'DEFAULT';
775 foreach (values (%reader))
779 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
780 open(STDOUT,">&writer");
781 open(STDERR,">&writer");
786 delete $ENV{"GNUPGHOME"};
787 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
788 $ENV{"TASK_QSEQUENCE"} = $id;
789 $ENV{"TASK_SEQUENCE"} = $level;
790 $ENV{"JOB_SCRIPT"} = $Job->{script};
791 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
792 $param =~ tr/a-z/A-Z/;
793 $ENV{"JOB_PARAMETER_$param"} = $value;
795 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
796 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
797 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
798 $ENV{"HOME"} = $ENV{"TASK_WORK"};
799 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
800 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
801 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
802 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
808 "--nodelist=".$childnode->{name},
809 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
810 "--job-name=$job_id.$id.$$",
813 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
814 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
815 ."&& cd $ENV{CRUNCH_TMP} "
816 # These environment variables get used explicitly later in
817 # $command. No tool is expected to read these values directly.
818 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
819 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
820 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
821 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
822 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
825 my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
826 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
827 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
828 # We only set memory limits if Docker lets us limit both memory and swap.
829 # Memory limits alone have been supported longer, but subprocesses tend
830 # to get SIGKILL if they exceed that without any swap limit set.
831 # See #5642 for additional background.
832 if ($docker_limitmem) {
833 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
836 # Dynamically configure the container to use the host system as its
837 # DNS server. Get the host's global addresses from the ip command,
838 # and turn them into docker --dns options using gawk.
840 q{$(ip -o address show scope global |
841 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
843 # The source tree and $destdir directory (which we have
844 # installed on the worker host) are available in the container,
845 # under the same path.
846 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
847 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
849 # Currently, we make arv-mount's mount point appear at /keep
850 # inside the container (instead of using the same path as the
851 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
852 # crunch scripts and utilities must not rely on this. They must
853 # use $TASK_KEEPMOUNT.
854 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
855 $ENV{TASK_KEEPMOUNT} = "/keep";
857 # TASK_WORK is almost exactly like a docker data volume: it
858 # starts out empty, is writable, and persists until no
859 # containers use it any more. We don't use --volumes-from to
860 # share it with other containers: it is only accessible to this
861 # task, and it goes away when this task stops.
863 # However, a docker data volume is writable only by root unless
864 # the mount point already happens to exist in the container with
865 # different permissions. Therefore, we [1] assume /tmp already
866 # exists in the image and is writable by the crunch user; [2]
867 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
868 # writable if they are created by docker while setting up the
869 # other --volumes); and [3] create $TASK_WORK inside the
870 # container using $build_script.
871 $command .= "--volume=/tmp ";
872 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
873 $ENV{"HOME"} = $ENV{"TASK_WORK"};
874 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
876 # TODO: Share a single JOB_WORK volume across all task
877 # containers on a given worker node, and delete it when the job
878 # ends (and, in case that doesn't work, when the next job
881 # For now, use the same approach as TASK_WORK above.
882 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
884 while (my ($env_key, $env_val) = each %ENV)
886 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
887 $command .= "--env=\Q$env_key=$env_val\E ";
890 $command .= "--env=\QHOME=$ENV{HOME}\E ";
891 $command .= "\Q$docker_hash\E ";
892 $command .= "stdbuf --output=0 --error=0 ";
893 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
896 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
897 $command .= "stdbuf --output=0 --error=0 ";
898 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
901 my @execargs = ('bash', '-c', $command);
902 srun (\@srunargs, \@execargs, undef, $build_script);
903 # exec() failed, we assume nothing happened.
904 die "srun() failed on build script\n";
907 if (!defined $childpid)
914 $proc{$childpid} = { jobstep => $id,
917 jobstepname => "$job_id.$id.$childpid",
919 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
920 $slot[$childslot]->{pid} = $childpid;
922 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
923 Log ($id, "child $childpid started on $childslotname");
924 $Jobstep->{starttime} = time;
925 $Jobstep->{node} = $childnode->{name};
926 $Jobstep->{slotindex} = $childslot;
927 delete $Jobstep->{stderr};
928 delete $Jobstep->{finishtime};
930 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
931 $Jobstep->{'arvados_task'}->save;
933 splice @jobstep_todo, $todo_ptr, 1;
936 $progress_is_dirty = 1;
940 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
942 last THISROUND if $main::please_freeze || defined($main::success);
943 if ($main::please_info)
945 $main::please_info = 0;
947 create_output_collection();
949 update_progress_stats();
956 check_refresh_wanted();
958 update_progress_stats();
959 select (undef, undef, undef, 0.1);
961 elsif (time - $progress_stats_updated >= 30)
963 update_progress_stats();
965 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
966 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
968 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
969 .($thisround_failed+$thisround_succeeded)
970 .") -- giving up on this round";
971 Log (undef, $message);
975 # move slots from freeslot to holdslot (or back to freeslot) if necessary
976 for (my $i=$#freeslot; $i>=0; $i--) {
977 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
978 push @holdslot, (splice @freeslot, $i, 1);
981 for (my $i=$#holdslot; $i>=0; $i--) {
982 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
983 push @freeslot, (splice @holdslot, $i, 1);
987 # give up if no nodes are succeeding
988 if (!grep { $_->{node}->{losing_streak} == 0 &&
989 $_->{node}->{hold_count} < 4 } @slot) {
990 my $message = "Every node has failed -- giving up on this round";
991 Log (undef, $message);
998 push @freeslot, splice @holdslot;
999 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1002 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1005 if ($main::please_continue) {
1006 $main::please_continue = 0;
1009 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1011 if (!reapchildren())
1013 check_refresh_wanted();
1015 update_progress_stats();
1016 select (undef, undef, undef, 0.1);
1017 killem (keys %proc) if $main::please_freeze;
1021 update_progress_stats();
1022 freeze_if_want_freeze();
1025 if (!defined $main::success)
1027 if (@jobstep_todo &&
1028 $thisround_succeeded == 0 &&
1029 ($thisround_failed == 0 || $thisround_failed > 4))
1031 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1032 Log (undef, $message);
1041 goto ONELEVEL if !defined $main::success;
1044 release_allocation();
1046 my $collated_output = &create_output_collection();
1048 if (!$collated_output) {
1049 Log (undef, "Failed to write output collection");
1052 Log(undef, "job output $collated_output");
1053 $Job->update_attributes('output' => $collated_output);
1056 Log (undef, "finish");
1061 if ($collated_output && $main::success) {
1062 $final_state = 'Complete';
1064 $final_state = 'Failed';
1066 $Job->update_attributes('state' => $final_state);
1068 exit (($final_state eq 'Complete') ? 0 : 1);
1072 sub update_progress_stats
1074 $progress_stats_updated = time;
1075 return if !$progress_is_dirty;
1076 my ($todo, $done, $running) = (scalar @jobstep_todo,
1077 scalar @jobstep_done,
1078 scalar @slot - scalar @freeslot - scalar @holdslot);
1079 $Job->{'tasks_summary'} ||= {};
1080 $Job->{'tasks_summary'}->{'todo'} = $todo;
1081 $Job->{'tasks_summary'}->{'done'} = $done;
1082 $Job->{'tasks_summary'}->{'running'} = $running;
1083 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1084 Log (undef, "status: $done done, $running running, $todo todo");
1085 $progress_is_dirty = 0;
1092 my $pid = waitpid (-1, WNOHANG);
1093 return 0 if $pid <= 0;
1095 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1097 . $slot[$proc{$pid}->{slot}]->{cpu});
1098 my $jobstepid = $proc{$pid}->{jobstep};
1099 my $elapsed = time - $proc{$pid}->{time};
1100 my $Jobstep = $jobstep[$jobstepid];
1102 my $childstatus = $?;
1103 my $exitvalue = $childstatus >> 8;
1104 my $exitinfo = "exit ".exit_status_s($childstatus);
1105 $Jobstep->{'arvados_task'}->reload;
1106 my $task_success = $Jobstep->{'arvados_task'}->{success};
1108 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1110 if (!defined $task_success) {
1111 # task did not indicate one way or the other --> fail
1112 $Jobstep->{'arvados_task'}->{success} = 0;
1113 $Jobstep->{'arvados_task'}->save;
1120 $temporary_fail ||= $Jobstep->{node_fail};
1121 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1123 ++$thisround_failed;
1124 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1126 # Check for signs of a failed or misconfigured node
1127 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1128 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1129 # Don't count this against jobstep failure thresholds if this
1130 # node is already suspected faulty and srun exited quickly
1131 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1133 Log ($jobstepid, "blaming failure on suspect node " .
1134 $slot[$proc{$pid}->{slot}]->{node}->{name});
1135 $temporary_fail ||= 1;
1137 ban_node_by_slot($proc{$pid}->{slot});
1140 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1141 ++$Jobstep->{'failures'},
1142 $temporary_fail ? 'temporary ' : 'permanent',
1145 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1146 # Give up on this task, and the whole job
1149 # Put this task back on the todo queue
1150 push @jobstep_todo, $jobstepid;
1151 $Job->{'tasks_summary'}->{'failed'}++;
1155 ++$thisround_succeeded;
1156 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1157 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1158 push @jobstep_done, $jobstepid;
1159 Log ($jobstepid, "success in $elapsed seconds");
1161 $Jobstep->{exitcode} = $childstatus;
1162 $Jobstep->{finishtime} = time;
1163 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1164 $Jobstep->{'arvados_task'}->save;
1165 process_stderr ($jobstepid, $task_success);
1166 Log ($jobstepid, sprintf("task output (%d bytes): %s",
1167 length($Jobstep->{'arvados_task'}->{output}),
1168 $Jobstep->{'arvados_task'}->{output}));
1170 close $reader{$jobstepid};
1171 delete $reader{$jobstepid};
1172 delete $slot[$proc{$pid}->{slot}]->{pid};
1173 push @freeslot, $proc{$pid}->{slot};
1176 if ($task_success) {
1178 my $newtask_list = [];
1179 my $newtask_results;
1181 $newtask_results = api_call(
1184 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1186 'order' => 'qsequence',
1187 'offset' => scalar(@$newtask_list),
1189 push(@$newtask_list, @{$newtask_results->{items}});
1190 } while (@{$newtask_results->{items}});
1191 foreach my $arvados_task (@$newtask_list) {
1193 'level' => $arvados_task->{'sequence'},
1195 'arvados_task' => $arvados_task
1197 push @jobstep, $jobstep;
1198 push @jobstep_todo, $#jobstep;
1202 $progress_is_dirty = 1;
1206 sub check_refresh_wanted
1208 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1209 if (@stat && $stat[9] > $latest_refresh) {
1210 $latest_refresh = scalar time;
1211 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1212 for my $attr ('cancelled_at',
1213 'cancelled_by_user_uuid',
1214 'cancelled_by_client_uuid',
1216 $Job->{$attr} = $Job2->{$attr};
1218 if ($Job->{'state'} ne "Running") {
1219 if ($Job->{'state'} eq "Cancelled") {
1220 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1222 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1225 $main::please_freeze = 1;
1232 # return if the kill list was checked <4 seconds ago
1233 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1237 $squeue_kill_checked = time;
1239 # use killem() on procs whose killtime is reached
1242 if (exists $proc{$_}->{killtime}
1243 && $proc{$_}->{killtime} <= time)
1249 # return if the squeue was checked <60 seconds ago
1250 if (defined $squeue_checked && $squeue_checked > time - 60)
1254 $squeue_checked = time;
1258 # here is an opportunity to check for mysterious problems with local procs
1262 # get a list of steps still running
1263 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1265 if ($squeue[-1] ne "ok")
1271 # which of my jobsteps are running, according to squeue?
1275 if (/^(\d+)\.(\d+) (\S+)/)
1277 if ($1 eq $ENV{SLURM_JOBID})
1284 # which of my active child procs (>60s old) were not mentioned by squeue?
1285 foreach (keys %proc)
1287 if ($proc{$_}->{time} < time - 60
1288 && !exists $ok{$proc{$_}->{jobstepname}}
1289 && !exists $proc{$_}->{killtime})
1291 # kill this proc if it hasn't exited in 30 seconds
1292 $proc{$_}->{killtime} = time + 30;
1298 sub release_allocation
1302 Log (undef, "release job allocation");
1303 system "scancel $ENV{SLURM_JOBID}";
1311 foreach my $job (keys %reader)
1314 while (0 < sysread ($reader{$job}, $buf, 8192))
1316 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1317 $jobstep[$job]->{stderr} .= $buf;
1318 preprocess_stderr ($job);
1319 if (length ($jobstep[$job]->{stderr}) > 16384)
1321 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1330 sub preprocess_stderr
1334 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1336 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1337 Log ($job, "stderr $line");
1338 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1340 $main::please_freeze = 1;
1342 elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1343 $jobstep[$job]->{node_fail} = 1;
1344 ban_node_by_slot($jobstep[$job]->{slotindex});
1353 my $task_success = shift;
1354 preprocess_stderr ($job);
1357 Log ($job, "stderr $_");
1358 } split ("\n", $jobstep[$job]->{stderr});
1365 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1366 Log(undef, "fetch_block run error from arv-get $hash: $!");
1369 my $output_block = "";
1372 my $bytes = sysread($keep, $buf, 1024 * 1024);
1373 if (!defined $bytes) {
1374 Log(undef, "fetch_block read error from arv-get: $!");
1375 $output_block = undef;
1377 } elsif ($bytes == 0) {
1378 # sysread returns 0 at the end of the pipe.
1381 # some bytes were read into buf.
1382 $output_block .= $buf;
1387 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1388 $output_block = undef;
1390 return $output_block;
1393 # Create a collection by concatenating the output of all tasks (each
1394 # task's output is either a manifest fragment, a locator for a
1395 # manifest fragment stored in Keep, or nothing at all). Return the
1396 # portable_data_hash of the new collection.
1397 sub create_output_collection
1399 Log (undef, "collate");
1401 my ($child_out, $child_in);
1402 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1405 print (arvados.api("v1").collections().
1406 create(body={"manifest_text": sys.stdin.read()}).
1407 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1411 my $manifest_size = 0;
1415 my $output = $_->{'arvados_task'}->{output};
1416 next if (!defined($output));
1418 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1419 $next_write = fetch_block($output);
1421 $next_write = $output;
1423 if (defined($next_write)) {
1424 if (!defined(syswrite($child_in, $next_write))) {
1425 # There's been an error writing. Stop the loop.
1426 # We'll log details about the exit code later.
1429 $manifest_size += length($next_write);
1432 my $uuid = $_->{'arvados_task'}->{'uuid'};
1433 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1438 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1441 my $s = IO::Select->new($child_out);
1442 if ($s->can_read(120)) {
1443 sysread($child_out, $joboutput, 1024 * 1024);
1446 Log(undef, "output collection creation exited " . exit_status_s($?));
1452 Log (undef, "timed out while creating output collection");
1453 foreach my $signal (2, 2, 2, 15, 15, 9) {
1454 kill($signal, $pid);
1455 last if waitpid($pid, WNOHANG) == -1;
1469 my $sig = 2; # SIGINT first
1470 if (exists $proc{$_}->{"sent_$sig"} &&
1471 time - $proc{$_}->{"sent_$sig"} > 4)
1473 $sig = 15; # SIGTERM if SIGINT doesn't work
1475 if (exists $proc{$_}->{"sent_$sig"} &&
1476 time - $proc{$_}->{"sent_$sig"} > 4)
1478 $sig = 9; # SIGKILL if SIGTERM doesn't work
1480 if (!exists $proc{$_}->{"sent_$sig"})
1482 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1484 select (undef, undef, undef, 0.1);
1487 kill $sig, $_; # srun wants two SIGINT to really interrupt
1489 $proc{$_}->{"sent_$sig"} = time;
1490 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1500 vec($bits,fileno($_),1) = 1;
1506 # Send log output to Keep via arv-put.
1508 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1509 # $log_pipe_pid is the pid of the arv-put subprocess.
1511 # The only functions that should access these variables directly are:
1513 # log_writer_start($logfilename)
1514 # Starts an arv-put pipe, reading data on stdin and writing it to
1515 # a $logfilename file in an output collection.
1517 # log_writer_send($txt)
1518 # Writes $txt to the output log collection.
1520 # log_writer_finish()
1521 # Closes the arv-put pipe and returns the output that it produces.
1523 # log_writer_is_active()
1524 # Returns a true value if there is currently a live arv-put
1525 # process, false otherwise.
1527 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1529 sub log_writer_start($)
1531 my $logfilename = shift;
1532 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1534 '--portable-data-hash',
1535 '--project-uuid', $Job->{owner_uuid},
1537 '--name', $logfilename,
1538 '--filename', $logfilename,
1542 sub log_writer_send($)
1545 print $log_pipe_in $txt;
1548 sub log_writer_finish()
1550 return unless $log_pipe_pid;
1552 close($log_pipe_in);
1555 my $s = IO::Select->new($log_pipe_out);
1556 if ($s->can_read(120)) {
1557 sysread($log_pipe_out, $arv_put_output, 1024);
1558 chomp($arv_put_output);
1560 Log (undef, "timed out reading from 'arv-put'");
1563 waitpid($log_pipe_pid, 0);
1564 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1566 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1569 return $arv_put_output;
1572 sub log_writer_is_active() {
1573 return $log_pipe_pid;
1576 sub Log # ($jobstep_id, $logmessage)
1578 if ($_[1] =~ /\n/) {
1579 for my $line (split (/\n/, $_[1])) {
1584 my $fh = select STDERR; $|=1; select $fh;
1585 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1586 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1589 if (log_writer_is_active() || -t STDERR) {
1590 my @gmtime = gmtime;
1591 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1592 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1594 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1596 if (log_writer_is_active()) {
1597 log_writer_send($datetime . " " . $message);
1604 my ($package, $file, $line) = caller;
1605 my $message = "@_ at $file line $line\n";
1606 Log (undef, $message);
1607 freeze() if @jobstep_todo;
1608 create_output_collection() if @jobstep_todo;
1618 if ($Job->{'state'} eq 'Cancelled') {
1619 $Job->update_attributes('finished_at' => scalar gmtime);
1621 $Job->update_attributes('state' => 'Failed');
1628 my $justcheckpoint = shift; # false if this will be the last meta saved
1629 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1630 return unless log_writer_is_active();
1632 my $loglocator = log_writer_finish();
1633 Log (undef, "log manifest is $loglocator");
1634 $Job->{'log'} = $loglocator;
1635 $Job->update_attributes('log', $loglocator);
1639 sub freeze_if_want_freeze
1641 if ($main::please_freeze)
1643 release_allocation();
1646 # kill some srun procs before freeze+stop
1647 map { $proc{$_} = {} } @_;
1650 killem (keys %proc);
1651 select (undef, undef, undef, 0.1);
1653 while (($died = waitpid (-1, WNOHANG)) > 0)
1655 delete $proc{$died};
1660 create_output_collection();
1670 Log (undef, "Freeze not implemented");
1677 croak ("Thaw not implemented");
1693 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1700 my $srunargs = shift;
1701 my $execargs = shift;
1702 my $opts = shift || {};
1704 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1706 $Data::Dumper::Terse = 1;
1707 $Data::Dumper::Indent = 0;
1708 my $show_cmd = Dumper($args);
1709 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1710 $show_cmd =~ s/\n/ /g;
1711 warn "starting: $show_cmd\n";
1713 if (defined $stdin) {
1714 my $child = open STDIN, "-|";
1715 defined $child or die "no fork: $!";
1717 print $stdin or die $!;
1718 close STDOUT or die $!;
1723 return system (@$args) if $opts->{fork};
1726 warn "ENV size is ".length(join(" ",%ENV));
1727 die "exec failed: $!: @$args";
1731 sub ban_node_by_slot {
1732 # Don't start any new jobsteps on this node for 60 seconds
1734 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1735 $slot[$slotid]->{node}->{hold_count}++;
1736 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1741 my ($lockfile, $error_message) = @_;
1742 open L, ">", $lockfile or croak("$lockfile: $!");
1743 if (!flock L, LOCK_EX|LOCK_NB) {
1744 croak("Can't lock $lockfile: $error_message\n");
1748 sub find_docker_image {
1749 # Given a Keep locator, check to see if it contains a Docker image.
1750 # If so, return its stream name and Docker hash.
1751 # If not, return undef for both values.
1752 my $locator = shift;
1753 my ($streamname, $filename);
1754 my $image = api_call("collections/get", uuid => $locator);
1756 foreach my $line (split(/\n/, $image->{manifest_text})) {
1757 my @tokens = split(/\s+/, $line);
1759 $streamname = shift(@tokens);
1760 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1761 if (defined($filename)) {
1762 return (undef, undef); # More than one file in the Collection.
1764 $filename = (split(/:/, $filedata, 3))[2];
1769 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1770 return ($streamname, $1);
1772 return (undef, undef);
1777 # Calculate the number of times an operation should be retried,
1778 # assuming exponential backoff, and that we're willing to retry as
1779 # long as tasks have been running. Enforce a minimum of 3 retries.
1780 my ($starttime, $endtime, $timediff, $retries);
1782 $starttime = $jobstep[0]->{starttime};
1783 $endtime = $jobstep[-1]->{finishtime};
1785 if (!defined($starttime)) {
1787 } elsif (!defined($endtime)) {
1788 $timediff = time - $starttime;
1790 $timediff = ($endtime - $starttime) - (time - $endtime);
1792 if ($timediff > 0) {
1793 $retries = int(log($timediff) / log(2));
1795 $retries = 1; # Use the minimum.
1797 return ($retries > 3) ? $retries : 3;
1801 # Pass in two function references.
1802 # This method will be called with the remaining arguments.
1803 # If it dies, retry it with exponential backoff until it succeeds,
1804 # or until the current retry_count is exhausted. After each failure
1805 # that can be retried, the second function will be called with
1806 # the current try count (0-based), next try time, and error message.
1807 my $operation = shift;
1808 my $retry_callback = shift;
1809 my $retries = retry_count();
1810 foreach my $try_count (0..$retries) {
1811 my $next_try = time + (2 ** $try_count);
1812 my $result = eval { $operation->(@_); };
1815 } elsif ($try_count < $retries) {
1816 $retry_callback->($try_count, $next_try, $@);
1817 my $sleep_time = $next_try - time;
1818 sleep($sleep_time) if ($sleep_time > 0);
1821 # Ensure the error message ends in a newline, so Perl doesn't add
1822 # retry_op's line number to it.
1828 # Pass in a /-separated API method name, and arguments for it.
1829 # This function will call that method, retrying as needed until
1830 # the current retry_count is exhausted, with a log on the first failure.
1831 my $method_name = shift;
1832 my $log_api_retry = sub {
1833 my ($try_count, $next_try_at, $errmsg) = @_;
1834 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1835 $errmsg =~ s/\s/ /g;
1836 $errmsg =~ s/\s+$//;
1838 if ($next_try_at < time) {
1839 $retry_msg = "Retrying.";
1841 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1842 $retry_msg = "Retrying at $next_try_fmt.";
1844 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1847 foreach my $key (split(/\//, $method_name)) {
1848 $method = $method->{$key};
1850 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1854 # Given a $?, return a human-readable exit code string like "0" or
1855 # "1" or "0 with signal 1" or "1 with signal 11".
1856 my $exitcode = shift;
1857 my $s = $exitcode >> 8;
1858 if ($exitcode & 0x7f) {
1859 $s .= " with signal " . ($exitcode & 0x7f);
1861 if ($exitcode & 0x80) {
1862 $s .= " with core dump";
1867 sub handle_readall {
1868 # Pass in a glob reference to a file handle.
1869 # Read all its contents and return them as a string.
1870 my $fh_glob_ref = shift;
1872 return <$fh_glob_ref>;
1875 sub tar_filename_n {
1877 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1880 sub add_git_archive {
1881 # Pass in a git archive command as a string or list, a la system().
1882 # This method will save its output to be included in the archive sent to the
1886 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1887 croak("Failed to save git archive: $!");
1889 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1891 waitpid($git_pid, 0);
1894 croak("Failed to save git archive: git exited " . exit_status_s($?));
1898 sub combined_git_archive {
1899 # Combine all saved tar archives into a single archive, then return its
1900 # contents in a string. Return undef if no archives have been saved.
1901 if ($git_tar_count < 1) {
1904 my $base_tar_name = tar_filename_n(1);
1905 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1906 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1907 if ($tar_exit != 0) {
1908 croak("Error preparing build archive: tar -A exited " .
1909 exit_status_s($tar_exit));
1912 if (!open(GIT_TAR, "<", $base_tar_name)) {
1913 croak("Could not open build archive: $!");
1915 my $tar_contents = handle_readall(\*GIT_TAR);
1917 return $tar_contents;
1920 sub set_nonblocking {
1922 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1923 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1929 # This is crunch-job's internal dispatch script. crunch-job running on the API
1930 # server invokes this script on individual compute nodes, or localhost if we're
1931 # running a job locally. It gets called in two modes:
1933 # * No arguments: Installation mode. Read a tar archive from the DATA
1934 # file handle; it includes the Crunch script's source code, and
1935 # maybe SDKs as well. Those should be installed in the proper
1936 # locations. This runs outside of any Docker container, so don't try to
1937 # introspect Crunch's runtime environment.
1939 # * With arguments: Crunch script run mode. This script should set up the
1940 # environment, then run the command specified in the arguments. This runs
1941 # inside any Docker container.
1944 use File::Path qw( make_path remove_tree );
1945 use POSIX qw(getcwd);
1947 use constant TASK_TEMPFAIL => 111;
1949 # Map SDK subdirectories to the path environments they belong to.
1950 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1952 my $destdir = $ENV{"CRUNCH_SRC"};
1953 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
1954 my $repo = $ENV{"CRUNCH_SRC_URL"};
1955 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1956 my $job_work = $ENV{"JOB_WORK"};
1957 my $task_work = $ENV{"TASK_WORK"};
1959 open(STDOUT_ORIG, ">&", STDOUT);
1960 open(STDERR_ORIG, ">&", STDERR);
1962 for my $dir ($destdir, $job_work, $task_work) {
1965 -e $dir or die "Failed to create temporary directory ($dir): $!";
1970 remove_tree($task_work, {keep_root => 1});
1973 ### Crunch script run mode
1975 # We want to do routine logging during task 0 only. This gives the user
1976 # the information they need, but avoids repeating the information for every
1979 if ($ENV{TASK_SEQUENCE} eq "0") {
1982 printf STDERR_ORIG "[Crunch] $msg\n", @_;
1988 my $python_src = "$install_dir/python";
1989 my $venv_dir = "$job_work/.arvados.venv";
1990 my $venv_built = -e "$venv_dir/bin/activate";
1991 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1992 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1993 "--python=python2.7", $venv_dir);
1994 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1996 $Log->("Built Python SDK virtualenv");
1999 my $pip_bin = "pip";
2001 $Log->("Running in Python SDK virtualenv");
2002 $pip_bin = "$venv_dir/bin/pip";
2003 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2004 @ARGV = ("/bin/sh", "-ec",
2005 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2006 } elsif (-d $python_src) {
2007 $Log->("Warning: virtualenv not found inside Docker container default " .
2008 "\$PATH. Can't install Python SDK.");
2011 my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2013 $Log->("Using Arvados SDK:");
2014 foreach my $line (split /\n/, $pkgs) {
2018 $Log->("Arvados SDK packages not found");
2021 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2022 my $sdk_path = "$install_dir/$sdk_dir";
2024 if ($ENV{$sdk_envkey}) {
2025 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2027 $ENV{$sdk_envkey} = $sdk_path;
2029 $Log->("Arvados SDK added to %s", $sdk_envkey);
2034 die "Cannot exec `@ARGV`: $!";
2037 ### Installation mode
2038 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2040 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2041 # This exact git archive (source + arvados sdk) is already installed
2042 # here, so there's no need to reinstall it.
2044 # We must consume our DATA section, though: otherwise the process
2045 # feeding it to us will get SIGPIPE.
2047 while (read(DATA, $buf, 65536)) { }
2052 unlink "$destdir.archive_hash";
2055 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
2056 die "Error launching 'tar -xC $destdir': $!";
2058 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2059 # get SIGPIPE. We must feed it data incrementally.
2061 while (read(DATA, $tar_input, 65536)) {
2062 print TARX $tar_input;
2065 die "'tar -xC $destdir' exited $?: $!";
2070 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2072 foreach my $sdk_lang (("python",
2073 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2074 if (-d "$sdk_root/$sdk_lang") {
2075 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2076 die "Failed to install $sdk_lang SDK: $!";
2082 my $python_dir = "$install_dir/python";
2083 if ((-d $python_dir) and can_run("python2.7") and
2084 (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2085 # egg_info failed, probably when it asked git for a build tag.
2086 # Specify no build tag.
2087 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2088 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2092 # Hide messages from the install script (unless it fails: shell_or_die
2093 # will show $destdir.log in that case).
2094 open(STDOUT, ">>", "$destdir.log");
2095 open(STDERR, ">&", STDOUT);
2097 if (-e "$destdir/crunch_scripts/install") {
2098 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2099 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2101 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2102 } elsif (-e "./install.sh") {
2103 shell_or_die (undef, "./install.sh", $install_dir);
2106 if ($archive_hash) {
2107 unlink "$destdir.archive_hash.new";
2108 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2109 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2115 my $command_name = shift;
2116 open(my $which, "-|", "which", $command_name);
2117 while (<$which>) { }
2124 my $exitcode = shift;
2126 if ($ENV{"DEBUG"}) {
2127 print STDERR "@_\n";
2129 if (system (@_) != 0) {
2132 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2133 open STDERR, ">&STDERR_ORIG";
2134 system ("cat $destdir.log >&2");
2135 warn "@_ failed ($err): $exitstatus";
2136 if (defined($exitcode)) {
2140 exit (($code >> 8) || 1);