2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant EX_TEMPFAIL => 75;
101 $ENV{"TMPDIR"} ||= "/tmp";
102 unless (defined $ENV{"CRUNCH_TMP"}) {
103 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
104 if ($ENV{"USER"} ne "crunch" && $< != 0) {
105 # use a tmp dir unique for my uid
106 $ENV{"CRUNCH_TMP"} .= "-$<";
110 # Create the tmp directory if it does not exist
111 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
112 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
116 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
117 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
118 mkdir ($ENV{"JOB_WORK"});
126 GetOptions('force-unlock' => \$force_unlock,
127 'git-dir=s' => \$git_dir,
128 'job=s' => \$jobspec,
129 'job-api-token=s' => \$job_api_token,
130 'no-clear-tmp' => \$no_clear_tmp,
131 'resume-stash=s' => \$resume_stash,
134 if (defined $job_api_token) {
135 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
138 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
144 $main::ENV{CRUNCH_DEBUG} = 1;
148 $main::ENV{CRUNCH_DEBUG} = 0;
153 my $arv = Arvados->new('apiVersion' => 'v1');
161 my $User = api_call("users/current");
163 if ($jobspec =~ /^[-a-z\d]+$/)
165 # $jobspec is an Arvados UUID, not a JSON job specification
166 $Job = api_call("jobs/get", uuid => $jobspec);
167 if (!$force_unlock) {
168 # Claim this job, and make sure nobody else does
169 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
178 $Job = JSON::decode_json($jobspec);
182 map { croak ("No $_ specified") unless $Job->{$_} }
183 qw(script script_version script_parameters);
186 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
187 $Job->{'started_at'} = gmtime;
188 $Job->{'state'} = 'Running';
190 $Job = api_call("jobs/create", job => $Job);
192 $job_id = $Job->{'uuid'};
194 my $keep_logfile = $job_id . '.log.txt';
195 log_writer_start($keep_logfile);
197 $Job->{'runtime_constraints'} ||= {};
198 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
199 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
204 chomp($gem_versions);
205 chop($gem_versions); # Closing parentheses
210 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212 Log (undef, "check slurm allocation");
215 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
219 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
220 push @sinfo, "$localcpus localhost";
222 if (exists $ENV{SLURM_NODELIST})
224 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
228 my ($ncpus, $slurm_nodelist) = split;
229 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
232 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
235 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
238 foreach (split (",", $ranges))
251 push @nodelist, map {
253 $n =~ s/\[[-,\d]+\]/$_/;
260 push @nodelist, $nodelist;
263 foreach my $nodename (@nodelist)
265 Log (undef, "node $nodename - $ncpus slots");
266 my $node = { name => $nodename,
270 foreach my $cpu (1..$ncpus)
272 push @slot, { node => $node,
276 push @node, @nodelist;
281 # Ensure that we get one jobstep running on each allocated node before
282 # we start overloading nodes with concurrent steps
284 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
287 $Job->update_attributes(
288 'tasks_summary' => { 'failed' => 0,
293 Log (undef, "start");
294 $SIG{'INT'} = sub { $main::please_freeze = 1; };
295 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
296 $SIG{'TERM'} = \&croak;
297 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
298 $SIG{'ALRM'} = sub { $main::please_info = 1; };
299 $SIG{'CONT'} = sub { $main::please_continue = 1; };
300 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302 $main::please_freeze = 0;
303 $main::please_info = 0;
304 $main::please_continue = 0;
305 $main::please_refresh = 0;
306 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
308 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
309 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
310 $ENV{"JOB_UUID"} = $job_id;
313 my @jobstep_todo = ();
314 my @jobstep_done = ();
315 my @jobstep_tomerge = ();
316 my $jobstep_tomerge_level = 0;
318 my $squeue_kill_checked;
319 my $latest_refresh = scalar time;
323 if (defined $Job->{thawedfromkey})
325 thaw ($Job->{thawedfromkey});
329 my $first_task = api_call("job_tasks/create", job_task => {
330 'job_uuid' => $Job->{'uuid'},
335 push @jobstep, { 'level' => 0,
337 'arvados_task' => $first_task,
339 push @jobstep_todo, 0;
345 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
348 my $build_script = handle_readall(\*DATA);
349 my $nodelist = join(",", @node);
350 my $git_tar_count = 0;
352 if (!defined $no_clear_tmp) {
353 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
354 Log (undef, "Clean work dirs");
356 my $cleanpid = fork();
359 # Find FUSE mounts that look like Keep mounts (the mount path has the
360 # word "keep") and unmount them. Then clean up work directories.
361 # TODO: When #5036 is done and widely deployed, we can get rid of the
362 # regular expression and just unmount everything with type fuse.keep.
363 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
364 ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
369 last if $cleanpid == waitpid (-1, WNOHANG);
370 freeze_if_want_freeze ($cleanpid);
371 select (undef, undef, undef, 0.1);
373 Log (undef, "Cleanup command exited ".exit_status_s($?));
376 # If this job requires a Docker image, install that.
377 my $docker_bin = "/usr/bin/docker.io";
378 my ($docker_locator, $docker_stream, $docker_hash);
379 if ($docker_locator = $Job->{docker_image_locator}) {
380 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
383 croak("No Docker image hash found from locator $docker_locator");
385 $docker_stream =~ s/^\.//;
386 my $docker_install_script = qq{
387 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
388 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
391 my $docker_pid = fork();
392 if ($docker_pid == 0)
394 srun (["srun", "--nodelist=" . join(',', @node)],
395 ["/bin/sh", "-ec", $docker_install_script]);
400 last if $docker_pid == waitpid (-1, WNOHANG);
401 freeze_if_want_freeze ($docker_pid);
402 select (undef, undef, undef, 0.1);
406 croak("Installing Docker image from $docker_locator exited "
410 if ($Job->{arvados_sdk_version}) {
411 # The job also specifies an Arvados SDK version. Add the SDKs to the
412 # tar file for the build script to install.
413 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
414 $Job->{arvados_sdk_version}));
415 add_git_archive("git", "--git-dir=$git_dir", "archive",
416 "--prefix=.arvados.sdk/",
417 $Job->{arvados_sdk_version}, "sdk");
421 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
422 # If script_version looks like an absolute path, *and* the --git-dir
423 # argument was not given -- which implies we were not invoked by
424 # crunch-dispatch -- we will use the given path as a working
425 # directory instead of resolving script_version to a git commit (or
426 # doing anything else with git).
427 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
428 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
431 # Resolve the given script_version to a git commit sha1. Also, if
432 # the repository is remote, clone it into our local filesystem: this
433 # ensures "git archive" will work, and is necessary to reliably
434 # resolve a symbolic script_version like "master^".
435 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
437 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
439 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
441 # If we're running under crunch-dispatch, it will have already
442 # pulled the appropriate source tree into its own repository, and
443 # given us that repo's path as $git_dir.
445 # If we're running a "local" job, we might have to fetch content
446 # from a remote repository.
448 # (Currently crunch-dispatch gives a local path with --git-dir, but
449 # we might as well accept URLs there too in case it changes its
451 my $repo = $git_dir || $Job->{'repository'};
453 # Repository can be remote or local. If remote, we'll need to fetch it
454 # to a local dir before doing `git log` et al.
457 if ($repo =~ m{://|^[^/]*:}) {
458 # $repo is a git url we can clone, like git:// or https:// or
459 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
460 # not recognized here because distinguishing that from a local
461 # path is too fragile. If you really need something strange here,
462 # use the ssh:// form.
463 $repo_location = 'remote';
464 } elsif ($repo =~ m{^\.*/}) {
465 # $repo is a local path to a git index. We'll also resolve ../foo
466 # to ../foo/.git if the latter is a directory. To help
467 # disambiguate local paths from named hosted repositories, this
468 # form must be given as ./ or ../ if it's a relative path.
469 if (-d "$repo/.git") {
470 $repo = "$repo/.git";
472 $repo_location = 'local';
474 # $repo is none of the above. It must be the name of a hosted
476 my $arv_repo_list = api_call("repositories/list",
477 'filters' => [['name','=',$repo]]);
478 my @repos_found = @{$arv_repo_list->{'items'}};
479 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
481 Log(undef, "Repository '$repo' -> "
482 . join(", ", map { $_->{'uuid'} } @repos_found));
485 croak("Error: Found $n_found repositories with name '$repo'.");
487 $repo = $repos_found[0]->{'fetch_url'};
488 $repo_location = 'remote';
490 Log(undef, "Using $repo_location repository '$repo'");
491 $ENV{"CRUNCH_SRC_URL"} = $repo;
493 # Resolve given script_version (we'll call that $treeish here) to a
494 # commit sha1 ($commit).
495 my $treeish = $Job->{'script_version'};
497 if ($repo_location eq 'remote') {
498 # We minimize excess object-fetching by re-using the same bare
499 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
500 # just keep adding remotes to it as needed.
501 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
502 my $gitcmd = "git --git-dir=\Q$local_repo\E";
504 # Set up our local repo for caching remote objects, making
506 if (!-d $local_repo) {
507 make_path($local_repo) or croak("Error: could not create $local_repo");
509 # This works (exits 0 and doesn't delete fetched objects) even
510 # if $local_repo is already initialized:
511 `$gitcmd init --bare`;
513 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
516 # If $treeish looks like a hash (or abbrev hash) we look it up in
517 # our local cache first, since that's cheaper. (We don't want to
518 # do that with tags/branches though -- those change over time, so
519 # they should always be resolved by the remote repo.)
520 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
521 # Hide stderr because it's normal for this to fail:
522 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
524 # Careful not to resolve a branch named abcdeff to commit 1234567:
525 $sha1 =~ /^$treeish/ &&
526 $sha1 =~ /^([0-9a-f]{40})$/s) {
528 Log(undef, "Commit $commit already present in $local_repo");
532 if (!defined $commit) {
533 # If $treeish isn't just a hash or abbrev hash, or isn't here
534 # yet, we need to fetch the remote to resolve it correctly.
536 # First, remove all local heads. This prevents a name that does
537 # not exist on the remote from resolving to (or colliding with)
538 # a previously fetched branch or tag (possibly from a different
540 remove_tree("$local_repo/refs/heads", {keep_root => 1});
542 Log(undef, "Fetching objects from $repo to $local_repo");
543 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
545 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
549 # Now that the data is all here, we will use our local repo for
550 # the rest of our git activities.
554 my $gitcmd = "git --git-dir=\Q$repo\E";
555 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
556 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
557 croak("`$gitcmd rev-list` exited "
559 .", '$treeish' not found. Giving up.");
562 Log(undef, "Version $treeish is commit $commit");
564 if ($commit ne $Job->{'script_version'}) {
565 # Record the real commit id in the database, frozentokey, logs,
566 # etc. -- instead of an abbreviation or a branch name which can
567 # become ambiguous or point to a different commit in the future.
568 if (!$Job->update_attributes('script_version' => $commit)) {
569 croak("Error: failed to update job's script_version attribute");
573 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
574 add_git_archive("$gitcmd archive ''\Q$commit\E");
577 my $git_archive = combined_git_archive();
578 if (!defined $git_archive) {
579 Log(undef, "Skip install phase (no git archive)");
581 Log(undef, "Warning: This probably means workers have no source tree!");
585 Log(undef, "Run install script on all workers");
587 my @srunargs = ("srun",
588 "--nodelist=$nodelist",
589 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
590 my @execargs = ("sh", "-c",
591 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
593 my $installpid = fork();
594 if ($installpid == 0)
596 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
601 last if $installpid == waitpid (-1, WNOHANG);
602 freeze_if_want_freeze ($installpid);
603 select (undef, undef, undef, 0.1);
605 my $install_exited = $?;
606 Log (undef, "Install script exited ".exit_status_s($install_exited));
607 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
608 unlink($tar_filename);
610 exit (1) if $install_exited != 0;
613 foreach (qw (script script_version script_parameters runtime_constraints))
617 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
619 foreach (split (/\n/, $Job->{knobs}))
621 Log (undef, "knob " . $_);
626 $main::success = undef;
632 my $thisround_succeeded = 0;
633 my $thisround_failed = 0;
634 my $thisround_failed_multiple = 0;
636 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
637 or $a <=> $b } @jobstep_todo;
638 my $level = $jobstep[$jobstep_todo[0]]->{level};
639 Log (undef, "start level $level");
644 my @freeslot = (0..$#slot);
647 my $progress_is_dirty = 1;
648 my $progress_stats_updated = 0;
650 update_progress_stats();
655 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
657 my $id = $jobstep_todo[$todo_ptr];
658 my $Jobstep = $jobstep[$id];
659 if ($Jobstep->{level} != $level)
664 pipe $reader{$id}, "writer" or croak ($!);
665 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
666 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
668 my $childslot = $freeslot[0];
669 my $childnode = $slot[$childslot]->{node};
670 my $childslotname = join (".",
671 $slot[$childslot]->{node}->{name},
672 $slot[$childslot]->{cpu});
675 $Jobstep->{cidfile} = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
678 my $childpid = fork();
681 $SIG{'INT'} = 'DEFAULT';
682 $SIG{'QUIT'} = 'DEFAULT';
683 $SIG{'TERM'} = 'DEFAULT';
685 foreach (values (%reader))
689 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
690 open(STDOUT,">&writer");
691 open(STDERR,">&writer");
696 delete $ENV{"GNUPGHOME"};
697 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
698 $ENV{"TASK_QSEQUENCE"} = $id;
699 $ENV{"TASK_SEQUENCE"} = $level;
700 $ENV{"JOB_SCRIPT"} = $Job->{script};
701 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
702 $param =~ tr/a-z/A-Z/;
703 $ENV{"JOB_PARAMETER_$param"} = $value;
705 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
706 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
707 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
708 $ENV{"HOME"} = $ENV{"TASK_WORK"};
709 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
710 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
711 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
712 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
718 "--nodelist=".$childnode->{name},
719 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
720 "--job-name=$job_id.$id.$$",
723 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
724 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
725 ."&& cd $ENV{CRUNCH_TMP} ";
726 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
729 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$Jobstep->{cidfile} -poll=10000 ";
730 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$Jobstep->{cidfile} --sig-proxy ";
732 # Dynamically configure the container to use the host system as its
733 # DNS server. Get the host's global addresses from the ip command,
734 # and turn them into docker --dns options using gawk.
736 q{$(ip -o address show scope global |
737 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
739 # The source tree and $destdir directory (which we have
740 # installed on the worker host) are available in the container,
741 # under the same path.
742 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
743 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
745 # Currently, we make arv-mount's mount point appear at /keep
746 # inside the container (instead of using the same path as the
747 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
748 # crunch scripts and utilities must not rely on this. They must
749 # use $TASK_KEEPMOUNT.
750 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
751 $ENV{TASK_KEEPMOUNT} = "/keep";
753 # TASK_WORK is almost exactly like a docker data volume: it
754 # starts out empty, is writable, and persists until no
755 # containers use it any more. We don't use --volumes-from to
756 # share it with other containers: it is only accessible to this
757 # task, and it goes away when this task stops.
759 # However, a docker data volume is writable only by root unless
760 # the mount point already happens to exist in the container with
761 # different permissions. Therefore, we [1] assume /tmp already
762 # exists in the image and is writable by the crunch user; [2]
763 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
764 # writable if they are created by docker while setting up the
765 # other --volumes); and [3] create $TASK_WORK inside the
766 # container using $build_script.
767 $command .= "--volume=/tmp ";
768 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
769 $ENV{"HOME"} = $ENV{"TASK_WORK"};
770 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
772 # TODO: Share a single JOB_WORK volume across all task
773 # containers on a given worker node, and delete it when the job
774 # ends (and, in case that doesn't work, when the next job
777 # For now, use the same approach as TASK_WORK above.
778 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
780 while (my ($env_key, $env_val) = each %ENV)
782 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
783 $command .= "--env=\Q$env_key=$env_val\E ";
786 $command .= "--env=\QHOME=$ENV{HOME}\E ";
787 $command .= "\Q$docker_hash\E ";
788 $command .= "stdbuf --output=0 --error=0 ";
789 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
792 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
793 $command .= "stdbuf --output=0 --error=0 ";
794 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
797 my @execargs = ('bash', '-c', $command);
798 srun (\@srunargs, \@execargs, undef, $build_script);
799 # exec() failed, we assume nothing happened.
800 die "srun() failed on build script\n";
803 if (!defined $childpid)
810 $proc{$childpid} = { jobstep => $id,
813 jobstepname => "$job_id.$id.$childpid",
815 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
816 $slot[$childslot]->{pid} = $childpid;
818 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
819 Log ($id, "child $childpid started on $childslotname");
820 $Jobstep->{starttime} = time;
821 $Jobstep->{node} = $childnode->{name};
822 $Jobstep->{slotindex} = $childslot;
823 delete $Jobstep->{stderr};
824 delete $Jobstep->{finishtime};
826 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
827 $Jobstep->{'arvados_task'}->save;
829 splice @jobstep_todo, $todo_ptr, 1;
832 $progress_is_dirty = 1;
836 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
838 last THISROUND if $main::please_freeze || defined($main::success);
839 if ($main::please_info)
841 $main::please_info = 0;
843 create_output_collection();
845 update_progress_stats();
852 check_refresh_wanted();
854 update_progress_stats();
855 select (undef, undef, undef, 0.1);
857 elsif (time - $progress_stats_updated >= 30)
859 update_progress_stats();
861 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
862 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
864 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
865 .($thisround_failed+$thisround_succeeded)
866 .") -- giving up on this round";
867 Log (undef, $message);
871 # move slots from freeslot to holdslot (or back to freeslot) if necessary
872 for (my $i=$#freeslot; $i>=0; $i--) {
873 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
874 push @holdslot, (splice @freeslot, $i, 1);
877 for (my $i=$#holdslot; $i>=0; $i--) {
878 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
879 push @freeslot, (splice @holdslot, $i, 1);
883 # give up if no nodes are succeeding
884 if (!grep { $_->{node}->{losing_streak} == 0 &&
885 $_->{node}->{hold_count} < 4 } @slot) {
886 my $message = "Every node has failed -- giving up on this round";
887 Log (undef, $message);
894 push @freeslot, splice @holdslot;
895 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
898 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
901 if ($main::please_continue) {
902 $main::please_continue = 0;
905 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
909 check_refresh_wanted();
911 update_progress_stats();
912 select (undef, undef, undef, 0.1);
913 killem (keys %proc) if $main::please_freeze;
917 update_progress_stats();
918 freeze_if_want_freeze();
921 if (!defined $main::success)
924 $thisround_succeeded == 0 &&
925 ($thisround_failed == 0 || $thisround_failed > 4))
927 my $message = "stop because $thisround_failed tasks failed and none succeeded";
928 Log (undef, $message);
937 goto ONELEVEL if !defined $main::success;
940 release_allocation();
942 my $collated_output = &create_output_collection();
944 if (!$collated_output) {
945 Log (undef, "Failed to write output collection");
948 Log(undef, "job output $collated_output");
949 $Job->update_attributes('output' => $collated_output);
952 Log (undef, "finish");
957 if ($collated_output && $main::success) {
958 $final_state = 'Complete';
960 $final_state = 'Failed';
962 $Job->update_attributes('state' => $final_state);
964 exit (($final_state eq 'Complete') ? 0 : 1);
968 sub update_progress_stats
970 $progress_stats_updated = time;
971 return if !$progress_is_dirty;
972 my ($todo, $done, $running) = (scalar @jobstep_todo,
973 scalar @jobstep_done,
974 scalar @slot - scalar @freeslot - scalar @holdslot);
975 $Job->{'tasks_summary'} ||= {};
976 $Job->{'tasks_summary'}->{'todo'} = $todo;
977 $Job->{'tasks_summary'}->{'done'} = $done;
978 $Job->{'tasks_summary'}->{'running'} = $running;
979 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
980 Log (undef, "status: $done done, $running running, $todo todo");
981 $progress_is_dirty = 0;
988 my $pid = waitpid (-1, WNOHANG);
989 return 0 if $pid <= 0;
991 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
993 . $slot[$proc{$pid}->{slot}]->{cpu});
994 my $jobstepid = $proc{$pid}->{jobstep};
995 my $elapsed = time - $proc{$pid}->{time};
996 my $Jobstep = $jobstep[$jobstepid];
998 my $childstatus = $?;
999 my $exitvalue = $childstatus >> 8;
1000 my $exitinfo = "exit ".exit_status_s($childstatus);
1001 $Jobstep->{'arvados_task'}->reload;
1002 my $task_success = $Jobstep->{'arvados_task'}->{success};
1004 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1006 if (!defined $task_success) {
1007 # task did not indicate one way or the other --> fail
1008 $Jobstep->{'arvados_task'}->{success} = 0;
1009 $Jobstep->{'arvados_task'}->save;
1016 $temporary_fail ||= $Jobstep->{node_fail};
1017 $temporary_fail ||= ($exitvalue == 111);
1019 ++$thisround_failed;
1020 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1022 # Check for signs of a failed or misconfigured node
1023 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1024 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1025 # Don't count this against jobstep failure thresholds if this
1026 # node is already suspected faulty and srun exited quickly
1027 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1029 Log ($jobstepid, "blaming failure on suspect node " .
1030 $slot[$proc{$pid}->{slot}]->{node}->{name});
1031 $temporary_fail ||= 1;
1033 ban_node_by_slot($proc{$pid}->{slot});
1036 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1037 ++$Jobstep->{'failures'},
1038 $temporary_fail ? 'temporary ' : 'permanent',
1041 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1042 # Give up on this task, and the whole job
1045 # Put this task back on the todo queue
1046 push @jobstep_todo, $jobstepid;
1047 $Job->{'tasks_summary'}->{'failed'}++;
1051 ++$thisround_succeeded;
1052 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1053 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1054 push @jobstep_done, $jobstepid;
1055 Log ($jobstepid, "success in $elapsed seconds");
1057 $Jobstep->{exitcode} = $childstatus;
1058 $Jobstep->{finishtime} = time;
1059 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1060 $Jobstep->{'arvados_task'}->save;
1061 process_stderr ($jobstepid, $task_success);
1062 Log ($jobstepid, sprintf("task output (%d bytes): %s",
1063 length($Jobstep->{'arvados_task'}->{output}),
1064 $Jobstep->{'arvados_task'}->{output}));
1066 close $reader{$jobstepid};
1067 delete $reader{$jobstepid};
1068 delete $slot[$proc{$pid}->{slot}]->{pid};
1069 push @freeslot, $proc{$pid}->{slot};
1072 if (defined($Jobstep->{cidfile})) {
1073 unlink $Jobstep->{cidfile};
1074 delete $Jobstep->{cidfile};
1077 if ($task_success) {
1079 my $newtask_list = [];
1080 my $newtask_results;
1082 $newtask_results = api_call(
1085 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1087 'order' => 'qsequence',
1088 'offset' => scalar(@$newtask_list),
1090 push(@$newtask_list, @{$newtask_results->{items}});
1091 } while (@{$newtask_results->{items}});
1092 foreach my $arvados_task (@$newtask_list) {
1094 'level' => $arvados_task->{'sequence'},
1096 'arvados_task' => $arvados_task
1098 push @jobstep, $jobstep;
1099 push @jobstep_todo, $#jobstep;
1103 $progress_is_dirty = 1;
1107 sub check_refresh_wanted
1109 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1110 if (@stat && $stat[9] > $latest_refresh) {
1111 $latest_refresh = scalar time;
1112 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1113 for my $attr ('cancelled_at',
1114 'cancelled_by_user_uuid',
1115 'cancelled_by_client_uuid',
1117 $Job->{$attr} = $Job2->{$attr};
1119 if ($Job->{'state'} ne "Running") {
1120 if ($Job->{'state'} eq "Cancelled") {
1121 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1123 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1126 $main::please_freeze = 1;
1133 # return if the kill list was checked <4 seconds ago
1134 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1138 $squeue_kill_checked = time;
1140 # use killem() on procs whose killtime is reached
1143 if (exists $proc{$_}->{killtime}
1144 && $proc{$_}->{killtime} <= time)
1150 # return if the squeue was checked <60 seconds ago
1151 if (defined $squeue_checked && $squeue_checked > time - 60)
1155 $squeue_checked = time;
1159 # here is an opportunity to check for mysterious problems with local procs
1163 # get a list of steps still running
1164 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1166 if ($squeue[-1] ne "ok")
1172 # which of my jobsteps are running, according to squeue?
1176 if (/^(\d+)\.(\d+) (\S+)/)
1178 if ($1 eq $ENV{SLURM_JOBID})
1185 # which of my active child procs (>60s old) were not mentioned by squeue?
1186 foreach (keys %proc)
1188 if ($proc{$_}->{time} < time - 60
1189 && !exists $ok{$proc{$_}->{jobstepname}}
1190 && !exists $proc{$_}->{killtime})
1192 # kill this proc if it hasn't exited in 30 seconds
1193 $proc{$_}->{killtime} = time + 30;
1199 sub release_allocation
1203 Log (undef, "release job allocation");
1204 system "scancel $ENV{SLURM_JOBID}";
1212 foreach my $job (keys %reader)
1215 while (0 < sysread ($reader{$job}, $buf, 8192))
1217 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1218 $jobstep[$job]->{stderr} .= $buf;
1219 preprocess_stderr ($job);
1220 if (length ($jobstep[$job]->{stderr}) > 16384)
1222 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1231 sub preprocess_stderr
1235 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1237 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1238 Log ($job, "stderr $line");
1239 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1241 $main::please_freeze = 1;
1243 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1244 $jobstep[$job]->{node_fail} = 1;
1245 ban_node_by_slot($jobstep[$job]->{slotindex});
1254 my $task_success = shift;
1255 preprocess_stderr ($job);
1258 Log ($job, "stderr $_");
1259 } split ("\n", $jobstep[$job]->{stderr});
1266 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1267 Log(undef, "fetch_block run error from arv-get $hash: $!");
1270 my $output_block = "";
1273 my $bytes = sysread($keep, $buf, 1024 * 1024);
1274 if (!defined $bytes) {
1275 Log(undef, "fetch_block read error from arv-get: $!");
1276 $output_block = undef;
1278 } elsif ($bytes == 0) {
1279 # sysread returns 0 at the end of the pipe.
1282 # some bytes were read into buf.
1283 $output_block .= $buf;
1288 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1289 $output_block = undef;
1291 return $output_block;
1294 # Create a collection by concatenating the output of all tasks (each
1295 # task's output is either a manifest fragment, a locator for a
1296 # manifest fragment stored in Keep, or nothing at all). Return the
1297 # portable_data_hash of the new collection.
1298 sub create_output_collection
1300 Log (undef, "collate");
1302 my ($child_out, $child_in);
1303 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1306 print (arvados.api("v1").collections().
1307 create(body={"manifest_text": sys.stdin.read()}).
1308 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1312 my $manifest_size = 0;
1316 my $output = $_->{'arvados_task'}->{output};
1317 next if (!defined($output));
1319 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1320 $next_write = fetch_block($output);
1322 $next_write = $output;
1324 if (defined($next_write)) {
1325 if (!defined(syswrite($child_in, $next_write))) {
1326 # There's been an error writing. Stop the loop.
1327 # We'll log details about the exit code later.
1330 $manifest_size += length($next_write);
1333 my $uuid = $_->{'arvados_task'}->{'uuid'};
1334 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1339 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1342 my $s = IO::Select->new($child_out);
1343 if ($s->can_read(120)) {
1344 sysread($child_out, $joboutput, 1024 * 1024);
1347 Log(undef, "output collection creation exited " . exit_status_s($?));
1353 Log (undef, "timed out while creating output collection");
1354 foreach my $signal (2, 2, 2, 15, 15, 9) {
1355 kill($signal, $pid);
1356 last if waitpid($pid, WNOHANG) == -1;
1370 my $sig = 2; # SIGINT first
1371 if (exists $proc{$_}->{"sent_$sig"} &&
1372 time - $proc{$_}->{"sent_$sig"} > 4)
1374 $sig = 15; # SIGTERM if SIGINT doesn't work
1376 if (exists $proc{$_}->{"sent_$sig"} &&
1377 time - $proc{$_}->{"sent_$sig"} > 4)
1379 $sig = 9; # SIGKILL if SIGTERM doesn't work
1381 if (!exists $proc{$_}->{"sent_$sig"})
1383 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1385 select (undef, undef, undef, 0.1);
1388 kill $sig, $_; # srun wants two SIGINT to really interrupt
1390 $proc{$_}->{"sent_$sig"} = time;
1391 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1401 vec($bits,fileno($_),1) = 1;
1407 # Send log output to Keep via arv-put.
1409 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1410 # $log_pipe_pid is the pid of the arv-put subprocess.
1412 # The only functions that should access these variables directly are:
1414 # log_writer_start($logfilename)
1415 # Starts an arv-put pipe, reading data on stdin and writing it to
1416 # a $logfilename file in an output collection.
1418 # log_writer_send($txt)
1419 # Writes $txt to the output log collection.
1421 # log_writer_finish()
1422 # Closes the arv-put pipe and returns the output that it produces.
1424 # log_writer_is_active()
1425 # Returns a true value if there is currently a live arv-put
1426 # process, false otherwise.
1428 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1430 sub log_writer_start($)
1432 my $logfilename = shift;
1433 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1435 '--portable-data-hash',
1436 '--project-uuid', $Job->{owner_uuid},
1438 '--name', $logfilename,
1439 '--filename', $logfilename,
1443 sub log_writer_send($)
1446 print $log_pipe_in $txt;
1449 sub log_writer_finish()
1451 return unless $log_pipe_pid;
1453 close($log_pipe_in);
1456 my $s = IO::Select->new($log_pipe_out);
1457 if ($s->can_read(120)) {
1458 sysread($log_pipe_out, $arv_put_output, 1024);
1459 chomp($arv_put_output);
1461 Log (undef, "timed out reading from 'arv-put'");
1464 waitpid($log_pipe_pid, 0);
1465 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1467 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1470 return $arv_put_output;
1473 sub log_writer_is_active() {
1474 return $log_pipe_pid;
1477 sub Log # ($jobstep_id, $logmessage)
1479 if ($_[1] =~ /\n/) {
1480 for my $line (split (/\n/, $_[1])) {
1485 my $fh = select STDERR; $|=1; select $fh;
1486 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1487 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1490 if (log_writer_is_active() || -t STDERR) {
1491 my @gmtime = gmtime;
1492 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1493 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1495 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1497 if (log_writer_is_active()) {
1498 log_writer_send($datetime . " " . $message);
1505 my ($package, $file, $line) = caller;
1506 my $message = "@_ at $file line $line\n";
1507 Log (undef, $message);
1508 freeze() if @jobstep_todo;
1509 create_output_collection() if @jobstep_todo;
1519 if ($Job->{'state'} eq 'Cancelled') {
1520 $Job->update_attributes('finished_at' => scalar gmtime);
1522 $Job->update_attributes('state' => 'Failed');
1529 my $justcheckpoint = shift; # false if this will be the last meta saved
1530 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1531 return unless log_writer_is_active();
1533 my $loglocator = log_writer_finish();
1534 Log (undef, "log manifest is $loglocator");
1535 $Job->{'log'} = $loglocator;
1536 $Job->update_attributes('log', $loglocator);
1540 sub freeze_if_want_freeze
1542 if ($main::please_freeze)
1544 release_allocation();
1547 # kill some srun procs before freeze+stop
1548 map { $proc{$_} = {} } @_;
1551 killem (keys %proc);
1552 select (undef, undef, undef, 0.1);
1554 while (($died = waitpid (-1, WNOHANG)) > 0)
1556 delete $proc{$died};
1561 create_output_collection();
1571 Log (undef, "Freeze not implemented");
1578 croak ("Thaw not implemented");
1594 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1601 my $srunargs = shift;
1602 my $execargs = shift;
1603 my $opts = shift || {};
1605 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1607 $Data::Dumper::Terse = 1;
1608 $Data::Dumper::Indent = 0;
1609 my $show_cmd = Dumper($args);
1610 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1611 $show_cmd =~ s/\n/ /g;
1612 warn "starting: $show_cmd\n";
1614 if (defined $stdin) {
1615 my $child = open STDIN, "-|";
1616 defined $child or die "no fork: $!";
1618 print $stdin or die $!;
1619 close STDOUT or die $!;
1624 return system (@$args) if $opts->{fork};
1627 warn "ENV size is ".length(join(" ",%ENV));
1628 die "exec failed: $!: @$args";
1632 sub ban_node_by_slot {
1633 # Don't start any new jobsteps on this node for 60 seconds
1635 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1636 $slot[$slotid]->{node}->{hold_count}++;
1637 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1642 my ($lockfile, $error_message) = @_;
1643 open L, ">", $lockfile or croak("$lockfile: $!");
1644 if (!flock L, LOCK_EX|LOCK_NB) {
1645 croak("Can't lock $lockfile: $error_message\n");
1649 sub find_docker_image {
1650 # Given a Keep locator, check to see if it contains a Docker image.
1651 # If so, return its stream name and Docker hash.
1652 # If not, return undef for both values.
1653 my $locator = shift;
1654 my ($streamname, $filename);
1655 my $image = api_call("collections/get", uuid => $locator);
1657 foreach my $line (split(/\n/, $image->{manifest_text})) {
1658 my @tokens = split(/\s+/, $line);
1660 $streamname = shift(@tokens);
1661 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1662 if (defined($filename)) {
1663 return (undef, undef); # More than one file in the Collection.
1665 $filename = (split(/:/, $filedata, 3))[2];
1670 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1671 return ($streamname, $1);
1673 return (undef, undef);
1678 # Calculate the number of times an operation should be retried,
1679 # assuming exponential backoff, and that we're willing to retry as
1680 # long as tasks have been running. Enforce a minimum of 3 retries.
1681 my ($starttime, $endtime, $timediff, $retries);
1683 $starttime = $jobstep[0]->{starttime};
1684 $endtime = $jobstep[-1]->{finishtime};
1686 if (!defined($starttime)) {
1688 } elsif (!defined($endtime)) {
1689 $timediff = time - $starttime;
1691 $timediff = ($endtime - $starttime) - (time - $endtime);
1693 if ($timediff > 0) {
1694 $retries = int(log($timediff) / log(2));
1696 $retries = 1; # Use the minimum.
1698 return ($retries > 3) ? $retries : 3;
1702 # Pass in two function references.
1703 # This method will be called with the remaining arguments.
1704 # If it dies, retry it with exponential backoff until it succeeds,
1705 # or until the current retry_count is exhausted. After each failure
1706 # that can be retried, the second function will be called with
1707 # the current try count (0-based), next try time, and error message.
1708 my $operation = shift;
1709 my $retry_callback = shift;
1710 my $retries = retry_count();
1711 foreach my $try_count (0..$retries) {
1712 my $next_try = time + (2 ** $try_count);
1713 my $result = eval { $operation->(@_); };
1716 } elsif ($try_count < $retries) {
1717 $retry_callback->($try_count, $next_try, $@);
1718 my $sleep_time = $next_try - time;
1719 sleep($sleep_time) if ($sleep_time > 0);
1722 # Ensure the error message ends in a newline, so Perl doesn't add
1723 # retry_op's line number to it.
1729 # Pass in a /-separated API method name, and arguments for it.
1730 # This function will call that method, retrying as needed until
1731 # the current retry_count is exhausted, with a log on the first failure.
1732 my $method_name = shift;
1733 my $log_api_retry = sub {
1734 my ($try_count, $next_try_at, $errmsg) = @_;
1735 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1736 $errmsg =~ s/\s/ /g;
1737 $errmsg =~ s/\s+$//;
1739 if ($next_try_at < time) {
1740 $retry_msg = "Retrying.";
1742 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1743 $retry_msg = "Retrying at $next_try_fmt.";
1745 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1748 foreach my $key (split(/\//, $method_name)) {
1749 $method = $method->{$key};
1751 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1755 # Given a $?, return a human-readable exit code string like "0" or
1756 # "1" or "0 with signal 1" or "1 with signal 11".
1757 my $exitcode = shift;
1758 my $s = $exitcode >> 8;
1759 if ($exitcode & 0x7f) {
1760 $s .= " with signal " . ($exitcode & 0x7f);
1762 if ($exitcode & 0x80) {
1763 $s .= " with core dump";
1768 sub handle_readall {
1769 # Pass in a glob reference to a file handle.
1770 # Read all its contents and return them as a string.
1771 my $fh_glob_ref = shift;
1773 return <$fh_glob_ref>;
1776 sub tar_filename_n {
1778 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1781 sub add_git_archive {
1782 # Pass in a git archive command as a string or list, a la system().
1783 # This method will save its output to be included in the archive sent to the
1787 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1788 croak("Failed to save git archive: $!");
1790 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1792 waitpid($git_pid, 0);
1795 croak("Failed to save git archive: git exited " . exit_status_s($?));
1799 sub combined_git_archive {
1800 # Combine all saved tar archives into a single archive, then return its
1801 # contents in a string. Return undef if no archives have been saved.
1802 if ($git_tar_count < 1) {
1805 my $base_tar_name = tar_filename_n(1);
1806 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1807 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1808 if ($tar_exit != 0) {
1809 croak("Error preparing build archive: tar -A exited " .
1810 exit_status_s($tar_exit));
1813 if (!open(GIT_TAR, "<", $base_tar_name)) {
1814 croak("Could not open build archive: $!");
1816 my $tar_contents = handle_readall(\*GIT_TAR);
1818 return $tar_contents;
1824 # This is crunch-job's internal dispatch script. crunch-job running on the API
1825 # server invokes this script on individual compute nodes, or localhost if we're
1826 # running a job locally. It gets called in two modes:
1828 # * No arguments: Installation mode. Read a tar archive from the DATA
1829 # file handle; it includes the Crunch script's source code, and
1830 # maybe SDKs as well. Those should be installed in the proper
1831 # locations. This runs outside of any Docker container, so don't try to
1832 # introspect Crunch's runtime environment.
1834 # * With arguments: Crunch script run mode. This script should set up the
1835 # environment, then run the command specified in the arguments. This runs
1836 # inside any Docker container.
1839 use File::Path qw( make_path remove_tree );
1840 use POSIX qw(getcwd);
1842 # Map SDK subdirectories to the path environments they belong to.
1843 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1845 my $destdir = $ENV{"CRUNCH_SRC"};
1846 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1847 my $repo = $ENV{"CRUNCH_SRC_URL"};
1848 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1849 my $job_work = $ENV{"JOB_WORK"};
1850 my $task_work = $ENV{"TASK_WORK"};
1852 for my $dir ($destdir, $job_work, $task_work) {
1855 -e $dir or die "Failed to create temporary directory ($dir): $!";
1860 remove_tree($task_work, {keep_root => 1});
1863 open(STDOUT_ORIG, ">&", STDOUT);
1864 open(STDERR_ORIG, ">&", STDERR);
1865 open(STDOUT, ">>", "$destdir.log");
1866 open(STDERR, ">&", STDOUT);
1868 ### Crunch script run mode
1870 # We want to do routine logging during task 0 only. This gives the user
1871 # the information they need, but avoids repeating the information for every
1874 if ($ENV{TASK_SEQUENCE} eq "0") {
1877 printf STDERR_ORIG "[Crunch] $msg\n", @_;
1883 my $python_src = "$install_dir/python";
1884 my $venv_dir = "$job_work/.arvados.venv";
1885 my $venv_built = -e "$venv_dir/bin/activate";
1886 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1887 shell_or_die("virtualenv", "--quiet", "--system-site-packages",
1888 "--python=python2.7", $venv_dir);
1889 shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1891 $Log->("Built Python SDK virtualenv");
1894 my $pip_bin = "pip";
1896 $Log->("Running in Python SDK virtualenv");
1897 $pip_bin = "$venv_dir/bin/pip";
1898 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1899 @ARGV = ("/bin/sh", "-ec",
1900 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1901 } elsif (-d $python_src) {
1902 $Log->("Warning: virtualenv not found inside Docker container default " .
1903 "\$PATH. Can't install Python SDK.");
1906 my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1908 $Log->("Using Arvados SDK:");
1909 foreach my $line (split /\n/, $pkgs) {
1913 $Log->("Arvados SDK packages not found");
1916 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1917 my $sdk_path = "$install_dir/$sdk_dir";
1919 if ($ENV{$sdk_envkey}) {
1920 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1922 $ENV{$sdk_envkey} = $sdk_path;
1924 $Log->("Arvados SDK added to %s", $sdk_envkey);
1930 open(STDOUT, ">&", STDOUT_ORIG);
1931 open(STDERR, ">&", STDERR_ORIG);
1933 die "Cannot exec `@ARGV`: $!";
1936 ### Installation mode
1937 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1939 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1940 # This version already installed -> nothing to do.
1944 unlink "$destdir.commit";
1947 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1948 die "Error launching 'tar -xC $destdir': $!";
1950 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1951 # get SIGPIPE. We must feed it data incrementally.
1953 while (read(DATA, $tar_input, 65536)) {
1954 print TARX $tar_input;
1957 die "'tar -xC $destdir' exited $?: $!";
1962 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1964 foreach my $sdk_lang (("python",
1965 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1966 if (-d "$sdk_root/$sdk_lang") {
1967 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1968 die "Failed to install $sdk_lang SDK: $!";
1974 my $python_dir = "$install_dir/python";
1975 if ((-d $python_dir) and can_run("python2.7") and
1976 (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1977 # egg_info failed, probably when it asked git for a build tag.
1978 # Specify no build tag.
1979 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1980 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1984 if (-e "$destdir/crunch_scripts/install") {
1985 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1986 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1988 shell_or_die ("./tests/autotests.sh", $install_dir);
1989 } elsif (-e "./install.sh") {
1990 shell_or_die ("./install.sh", $install_dir);
1994 unlink "$destdir.commit.new";
1995 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1996 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
2002 my $command_name = shift;
2003 open(my $which, "-|", "which", $command_name);
2004 while (<$which>) { }
2011 if ($ENV{"DEBUG"}) {
2012 print STDERR "@_\n";
2014 if (system (@_) != 0) {
2016 my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
2017 open STDERR, ">&STDERR_ORIG";
2018 system ("cat $destdir.log >&2");
2019 die "@_ failed ($err): $exitstatus";