2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
89 use Digest::MD5 qw(md5_hex);
95 use File::Path qw( make_path remove_tree );
97 use constant EX_TEMPFAIL => 75;
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102 if ($ENV{"USER"} ne "crunch" && $< != 0) {
103 # use a tmp dir unique for my uid
104 $ENV{"CRUNCH_TMP"} .= "-$<";
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
124 GetOptions('force-unlock' => \$force_unlock,
125 'git-dir=s' => \$git_dir,
126 'job=s' => \$jobspec,
127 'job-api-token=s' => \$job_api_token,
128 'no-clear-tmp' => \$no_clear_tmp,
129 'resume-stash=s' => \$resume_stash,
132 if (defined $job_api_token) {
133 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
142 $main::ENV{CRUNCH_DEBUG} = 1;
146 $main::ENV{CRUNCH_DEBUG} = 0;
151 my $arv = Arvados->new('apiVersion' => 'v1');
153 my $User = $arv->{'users'}->{'current'}->execute;
159 if ($jobspec =~ /^[-a-z\d]+$/)
161 # $jobspec is an Arvados UUID, not a JSON job specification
162 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
163 if (!$force_unlock) {
164 # Claim this job, and make sure nobody else does
166 # lock() sets is_locked_by_uuid and changes state to Running.
167 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
170 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
177 $Job = JSON::decode_json($jobspec);
181 map { croak ("No $_ specified") unless $Job->{$_} }
182 qw(script script_version script_parameters);
185 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186 $Job->{'started_at'} = gmtime;
187 $Job->{'state'} = 'Running';
189 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191 $job_id = $Job->{'uuid'};
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201 Log (undef, "check slurm allocation");
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209 push @sinfo, "$localcpus localhost";
211 if (exists $ENV{SLURM_NODELIST})
213 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 my ($ncpus, $slurm_nodelist) = split;
218 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
221 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
224 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
227 foreach (split (",", $ranges))
240 push @nodelist, map {
242 $n =~ s/\[[-,\d]+\]/$_/;
249 push @nodelist, $nodelist;
252 foreach my $nodename (@nodelist)
254 Log (undef, "node $nodename - $ncpus slots");
255 my $node = { name => $nodename,
259 foreach my $cpu (1..$ncpus)
261 push @slot, { node => $node,
265 push @node, @nodelist;
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
276 $Job->update_attributes(
277 'tasks_summary' => { 'failed' => 0,
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
314 if (defined $Job->{thawedfromkey})
316 thaw ($Job->{thawedfromkey});
320 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321 'job_uuid' => $Job->{'uuid'},
326 push @jobstep, { 'level' => 0,
328 'arvados_task' => $first_task,
330 push @jobstep_todo, 0;
336 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 $build_script = <DATA>;
345 my $nodelist = join(",", @node);
347 if (!defined $no_clear_tmp) {
348 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
349 Log (undef, "Clean work dirs");
351 my $cleanpid = fork();
354 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
355 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
360 last if $cleanpid == waitpid (-1, WNOHANG);
361 freeze_if_want_freeze ($cleanpid);
362 select (undef, undef, undef, 0.1);
364 Log (undef, "Cleanup command exited ".exit_status_s($?));
369 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
370 # If script_version looks like an absolute path, *and* the --git-dir
371 # argument was not given -- which implies we were not invoked by
372 # crunch-dispatch -- we will use the given path as a working
373 # directory instead of resolving script_version to a git commit (or
374 # doing anything else with git).
375 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
376 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
379 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
381 # Install requested code version
382 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
384 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
386 # If we're running under crunch-dispatch, it will have already
387 # pulled the appropriate source tree into its own repository, and
388 # given us that repo's path as $git_dir.
390 # If we're running a "local" job, we might have to fetch content
391 # from a remote repository.
393 # (Currently crunch-dispatch gives a local path with --git-dir, but
394 # we might as well accept URLs there too in case it changes its
396 my $repo = $git_dir || $Job->{'repository'};
398 # Repository can be remote or local. If remote, we'll need to fetch it
399 # to a local dir before doing `git log` et al.
402 if ($repo =~ m{://|^[^/]*:}) {
403 # $repo is a git url we can clone, like git:// or https:// or
404 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
405 # not recognized here because distinguishing that from a local
406 # path is too fragile. If you really need something strange here,
407 # use the ssh:// form.
408 $repo_location = 'remote';
409 } elsif ($repo =~ m{^\.*/}) {
410 # $repo is a local path to a git index. We'll also resolve ../foo
411 # to ../foo/.git if the latter is a directory. To help
412 # disambiguate local paths from named hosted repositories, this
413 # form must be given as ./ or ../ if it's a relative path.
414 if (-d "$repo/.git") {
415 $repo = "$repo/.git";
417 $repo_location = 'local';
419 # $repo is none of the above. It must be the name of a hosted
421 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
422 'filters' => [['name','=',$repo]]
424 my @repos_found = @{$arv_repo_list->{'items'}};
425 my $n_found = $arv_repo_list->{'items_available'};
427 Log(undef, "Repository '$repo' -> "
428 . join(", ", map { $_->{'uuid'} } @repos_found));
431 croak("Error: Found $n_found repositories with name '$repo'.");
433 $repo = $repos_found[0]->{'fetch_url'};
434 $repo_location = 'remote';
436 Log(undef, "Using $repo_location repository '$repo'");
437 $ENV{"CRUNCH_SRC_URL"} = $repo;
439 # Resolve given script_version (we'll call that $treeish here) to a
440 # commit sha1 ($commit).
441 my $treeish = $Job->{'script_version'};
443 if ($repo_location eq 'remote') {
444 # We minimize excess object-fetching by re-using the same bare
445 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
446 # just keep adding remotes to it as needed.
447 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
448 my $gitcmd = "git --git-dir=\Q$local_repo\E";
450 # Set up our local repo for caching remote objects, making
452 if (!-d $local_repo) {
453 make_path($local_repo) or croak("Error: could not create $local_repo");
455 # This works (exits 0 and doesn't delete fetched objects) even
456 # if $local_repo is already initialized:
457 `$gitcmd init --bare`;
459 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
462 # If $treeish looks like a hash (or abbrev hash) we look it up in
463 # our local cache first, since that's cheaper. (We don't want to
464 # do that with tags/branches though -- those change over time, so
465 # they should always be resolved by the remote repo.)
466 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
467 # Hide stderr because it's normal for this to fail:
468 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
470 # Careful not to resolve a branch named abcdeff to commit 1234567:
471 $sha1 =~ /^$treeish/ &&
472 $sha1 =~ /^([0-9a-f]{40})$/s) {
474 Log(undef, "Commit $commit already present in $local_repo");
478 if (!defined $commit) {
479 # If $treeish isn't just a hash or abbrev hash, or isn't here
480 # yet, we need to fetch the remote to resolve it correctly.
482 # First, remove all local heads. This prevents a name that does
483 # not exist on the remote from resolving to (or colliding with)
484 # a previously fetched branch or tag (possibly from a different
486 remove_tree("$local_repo/refs/heads", {keep_root => 1});
488 Log(undef, "Fetching objects from $repo to $local_repo");
489 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
491 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
495 # Now that the data is all here, we will use our local repo for
496 # the rest of our git activities.
500 my $gitcmd = "git --git-dir=\Q$repo\E";
501 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
502 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
503 croak("`$gitcmd rev-list` exited "
505 .", '$treeish' not found. Giving up.");
508 Log(undef, "Version $treeish is commit $commit");
510 if ($commit ne $Job->{'script_version'}) {
511 # Record the real commit id in the database, frozentokey, logs,
512 # etc. -- instead of an abbreviation or a branch name which can
513 # become ambiguous or point to a different commit in the future.
514 if (!$Job->update_attributes('script_version' => $commit)) {
515 croak("Error: failed to update job's script_version attribute");
519 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
520 $git_archive = `$gitcmd archive ''\Q$commit\E`;
522 croak("Error: $gitcmd archive exited ".exit_status_s($?));
526 if (!defined $git_archive) {
527 Log(undef, "Skip install phase (no git archive)");
529 Log(undef, "Warning: This probably means workers have no source tree!");
533 Log(undef, "Run install script on all workers");
535 my @srunargs = ("srun",
536 "--nodelist=$nodelist",
537 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
538 my @execargs = ("sh", "-c",
539 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
541 # Note: this section is almost certainly unnecessary if we're
542 # running tasks in docker containers.
543 my $installpid = fork();
544 if ($installpid == 0)
546 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
551 last if $installpid == waitpid (-1, WNOHANG);
552 freeze_if_want_freeze ($installpid);
553 select (undef, undef, undef, 0.1);
555 Log (undef, "Install script exited ".exit_status_s($?));
560 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
561 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
564 # If this job requires a Docker image, install that.
565 my $docker_bin = "/usr/bin/docker.io";
566 my ($docker_locator, $docker_stream, $docker_hash);
567 if ($docker_locator = $Job->{docker_image_locator}) {
568 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
571 croak("No Docker image hash found from locator $docker_locator");
573 $docker_stream =~ s/^\.//;
574 my $docker_install_script = qq{
575 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
576 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
579 my $docker_pid = fork();
580 if ($docker_pid == 0)
582 srun (["srun", "--nodelist=" . join(',', @node)],
583 ["/bin/sh", "-ec", $docker_install_script]);
588 last if $docker_pid == waitpid (-1, WNOHANG);
589 freeze_if_want_freeze ($docker_pid);
590 select (undef, undef, undef, 0.1);
594 croak("Installing Docker image from $docker_locator exited "
599 foreach (qw (script script_version script_parameters runtime_constraints))
603 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
605 foreach (split (/\n/, $Job->{knobs}))
607 Log (undef, "knob " . $_);
612 $main::success = undef;
618 my $thisround_succeeded = 0;
619 my $thisround_failed = 0;
620 my $thisround_failed_multiple = 0;
622 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
623 or $a <=> $b } @jobstep_todo;
624 my $level = $jobstep[$jobstep_todo[0]]->{level};
625 Log (undef, "start level $level");
630 my @freeslot = (0..$#slot);
633 my $progress_is_dirty = 1;
634 my $progress_stats_updated = 0;
636 update_progress_stats();
641 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
643 my $id = $jobstep_todo[$todo_ptr];
644 my $Jobstep = $jobstep[$id];
645 if ($Jobstep->{level} != $level)
650 pipe $reader{$id}, "writer" or croak ($!);
651 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
652 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
654 my $childslot = $freeslot[0];
655 my $childnode = $slot[$childslot]->{node};
656 my $childslotname = join (".",
657 $slot[$childslot]->{node}->{name},
658 $slot[$childslot]->{cpu});
659 my $childpid = fork();
662 $SIG{'INT'} = 'DEFAULT';
663 $SIG{'QUIT'} = 'DEFAULT';
664 $SIG{'TERM'} = 'DEFAULT';
666 foreach (values (%reader))
670 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
671 open(STDOUT,">&writer");
672 open(STDERR,">&writer");
677 delete $ENV{"GNUPGHOME"};
678 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
679 $ENV{"TASK_QSEQUENCE"} = $id;
680 $ENV{"TASK_SEQUENCE"} = $level;
681 $ENV{"JOB_SCRIPT"} = $Job->{script};
682 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
683 $param =~ tr/a-z/A-Z/;
684 $ENV{"JOB_PARAMETER_$param"} = $value;
686 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
687 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
688 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
689 $ENV{"HOME"} = $ENV{"TASK_WORK"};
690 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
691 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
692 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
693 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
699 "--nodelist=".$childnode->{name},
700 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
701 "--job-name=$job_id.$id.$$",
703 my $build_script_to_send = "";
705 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
706 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
707 ."&& cd $ENV{CRUNCH_TMP} ";
710 $build_script_to_send = $build_script;
714 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
717 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
718 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
719 # Dynamically configure the container to use the host system as its
720 # DNS server. Get the host's global addresses from the ip command,
721 # and turn them into docker --dns options using gawk.
723 q{$(ip -o address show scope global |
724 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
725 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
726 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
727 $command .= "--env=\QHOME=/home/crunch\E ";
728 while (my ($env_key, $env_val) = each %ENV)
730 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
731 if ($env_key eq "TASK_WORK") {
732 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
734 elsif ($env_key eq "TASK_KEEPMOUNT") {
735 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
738 $command .= "--env=\Q$env_key=$env_val\E ";
742 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
743 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
744 $command .= "\Q$docker_hash\E ";
745 $command .= "stdbuf --output=0 --error=0 ";
746 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
749 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
750 $command .= "stdbuf --output=0 --error=0 ";
751 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
754 my @execargs = ('bash', '-c', $command);
755 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
756 # exec() failed, we assume nothing happened.
757 Log(undef, "srun() failed on build script");
761 if (!defined $childpid)
768 $proc{$childpid} = { jobstep => $id,
771 jobstepname => "$job_id.$id.$childpid",
773 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
774 $slot[$childslot]->{pid} = $childpid;
776 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
777 Log ($id, "child $childpid started on $childslotname");
778 $Jobstep->{starttime} = time;
779 $Jobstep->{node} = $childnode->{name};
780 $Jobstep->{slotindex} = $childslot;
781 delete $Jobstep->{stderr};
782 delete $Jobstep->{finishtime};
784 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
785 $Jobstep->{'arvados_task'}->save;
787 splice @jobstep_todo, $todo_ptr, 1;
790 $progress_is_dirty = 1;
794 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
796 last THISROUND if $main::please_freeze;
797 if ($main::please_info)
799 $main::please_info = 0;
803 update_progress_stats();
810 check_refresh_wanted();
812 update_progress_stats();
813 select (undef, undef, undef, 0.1);
815 elsif (time - $progress_stats_updated >= 30)
817 update_progress_stats();
819 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
820 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
822 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
823 .($thisround_failed+$thisround_succeeded)
824 .") -- giving up on this round";
825 Log (undef, $message);
829 # move slots from freeslot to holdslot (or back to freeslot) if necessary
830 for (my $i=$#freeslot; $i>=0; $i--) {
831 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
832 push @holdslot, (splice @freeslot, $i, 1);
835 for (my $i=$#holdslot; $i>=0; $i--) {
836 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
837 push @freeslot, (splice @holdslot, $i, 1);
841 # give up if no nodes are succeeding
842 if (!grep { $_->{node}->{losing_streak} == 0 &&
843 $_->{node}->{hold_count} < 4 } @slot) {
844 my $message = "Every node has failed -- giving up on this round";
845 Log (undef, $message);
852 push @freeslot, splice @holdslot;
853 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
856 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
859 if ($main::please_continue) {
860 $main::please_continue = 0;
863 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
867 check_refresh_wanted();
869 update_progress_stats();
870 select (undef, undef, undef, 0.1);
871 killem (keys %proc) if $main::please_freeze;
875 update_progress_stats();
876 freeze_if_want_freeze();
879 if (!defined $main::success)
882 $thisround_succeeded == 0 &&
883 ($thisround_failed == 0 || $thisround_failed > 4))
885 my $message = "stop because $thisround_failed tasks failed and none succeeded";
886 Log (undef, $message);
895 goto ONELEVEL if !defined $main::success;
898 release_allocation();
900 my $collated_output = &collate_output();
902 if (!$collated_output) {
903 Log(undef, "output undef");
907 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
908 or die "failed to get collated manifest: $!";
909 my $orig_manifest_text = '';
910 while (my $manifest_line = <$orig_manifest>) {
911 $orig_manifest_text .= $manifest_line;
913 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
914 'manifest_text' => $orig_manifest_text,
916 Log(undef, "output uuid " . $output->{uuid});
917 Log(undef, "output hash " . $output->{portable_data_hash});
918 $Job->update_attributes('output' => $output->{portable_data_hash});
921 Log (undef, "Failed to register output manifest: $@");
925 Log (undef, "finish");
930 if ($collated_output && $main::success) {
931 $final_state = 'Complete';
933 $final_state = 'Failed';
935 $Job->update_attributes('state' => $final_state);
937 exit (($final_state eq 'Complete') ? 0 : 1);
941 sub update_progress_stats
943 $progress_stats_updated = time;
944 return if !$progress_is_dirty;
945 my ($todo, $done, $running) = (scalar @jobstep_todo,
946 scalar @jobstep_done,
947 scalar @slot - scalar @freeslot - scalar @holdslot);
948 $Job->{'tasks_summary'} ||= {};
949 $Job->{'tasks_summary'}->{'todo'} = $todo;
950 $Job->{'tasks_summary'}->{'done'} = $done;
951 $Job->{'tasks_summary'}->{'running'} = $running;
952 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
953 Log (undef, "status: $done done, $running running, $todo todo");
954 $progress_is_dirty = 0;
961 my $pid = waitpid (-1, WNOHANG);
962 return 0 if $pid <= 0;
964 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
966 . $slot[$proc{$pid}->{slot}]->{cpu});
967 my $jobstepid = $proc{$pid}->{jobstep};
968 my $elapsed = time - $proc{$pid}->{time};
969 my $Jobstep = $jobstep[$jobstepid];
971 my $childstatus = $?;
972 my $exitvalue = $childstatus >> 8;
973 my $exitinfo = "exit ".exit_status_s($childstatus);
974 $Jobstep->{'arvados_task'}->reload;
975 my $task_success = $Jobstep->{'arvados_task'}->{success};
977 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
979 if (!defined $task_success) {
980 # task did not indicate one way or the other --> fail
981 $Jobstep->{'arvados_task'}->{success} = 0;
982 $Jobstep->{'arvados_task'}->save;
989 $temporary_fail ||= $Jobstep->{node_fail};
990 $temporary_fail ||= ($exitvalue == 111);
993 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
995 # Check for signs of a failed or misconfigured node
996 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
997 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
998 # Don't count this against jobstep failure thresholds if this
999 # node is already suspected faulty and srun exited quickly
1000 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1002 Log ($jobstepid, "blaming failure on suspect node " .
1003 $slot[$proc{$pid}->{slot}]->{node}->{name});
1004 $temporary_fail ||= 1;
1006 ban_node_by_slot($proc{$pid}->{slot});
1009 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1010 ++$Jobstep->{'failures'},
1011 $temporary_fail ? 'temporary ' : 'permanent',
1014 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1015 # Give up on this task, and the whole job
1017 $main::please_freeze = 1;
1019 # Put this task back on the todo queue
1020 push @jobstep_todo, $jobstepid;
1021 $Job->{'tasks_summary'}->{'failed'}++;
1025 ++$thisround_succeeded;
1026 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1027 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1028 push @jobstep_done, $jobstepid;
1029 Log ($jobstepid, "success in $elapsed seconds");
1031 $Jobstep->{exitcode} = $childstatus;
1032 $Jobstep->{finishtime} = time;
1033 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1034 $Jobstep->{'arvados_task'}->save;
1035 process_stderr ($jobstepid, $task_success);
1036 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1038 close $reader{$jobstepid};
1039 delete $reader{$jobstepid};
1040 delete $slot[$proc{$pid}->{slot}]->{pid};
1041 push @freeslot, $proc{$pid}->{slot};
1044 if ($task_success) {
1046 my $newtask_list = [];
1047 my $newtask_results;
1049 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1051 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1053 'order' => 'qsequence',
1054 'offset' => scalar(@$newtask_list),
1056 push(@$newtask_list, @{$newtask_results->{items}});
1057 } while (@{$newtask_results->{items}});
1058 foreach my $arvados_task (@$newtask_list) {
1060 'level' => $arvados_task->{'sequence'},
1062 'arvados_task' => $arvados_task
1064 push @jobstep, $jobstep;
1065 push @jobstep_todo, $#jobstep;
1069 $progress_is_dirty = 1;
1073 sub check_refresh_wanted
1075 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1076 if (@stat && $stat[9] > $latest_refresh) {
1077 $latest_refresh = scalar time;
1078 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1079 for my $attr ('cancelled_at',
1080 'cancelled_by_user_uuid',
1081 'cancelled_by_client_uuid',
1083 $Job->{$attr} = $Job2->{$attr};
1085 if ($Job->{'state'} ne "Running") {
1086 if ($Job->{'state'} eq "Cancelled") {
1087 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1089 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1092 $main::please_freeze = 1;
1099 # return if the kill list was checked <4 seconds ago
1100 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1104 $squeue_kill_checked = time;
1106 # use killem() on procs whose killtime is reached
1109 if (exists $proc{$_}->{killtime}
1110 && $proc{$_}->{killtime} <= time)
1116 # return if the squeue was checked <60 seconds ago
1117 if (defined $squeue_checked && $squeue_checked > time - 60)
1121 $squeue_checked = time;
1125 # here is an opportunity to check for mysterious problems with local procs
1129 # get a list of steps still running
1130 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1132 if ($squeue[-1] ne "ok")
1138 # which of my jobsteps are running, according to squeue?
1142 if (/^(\d+)\.(\d+) (\S+)/)
1144 if ($1 eq $ENV{SLURM_JOBID})
1151 # which of my active child procs (>60s old) were not mentioned by squeue?
1152 foreach (keys %proc)
1154 if ($proc{$_}->{time} < time - 60
1155 && !exists $ok{$proc{$_}->{jobstepname}}
1156 && !exists $proc{$_}->{killtime})
1158 # kill this proc if it hasn't exited in 30 seconds
1159 $proc{$_}->{killtime} = time + 30;
1165 sub release_allocation
1169 Log (undef, "release job allocation");
1170 system "scancel $ENV{SLURM_JOBID}";
1178 foreach my $job (keys %reader)
1181 while (0 < sysread ($reader{$job}, $buf, 8192))
1183 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1184 $jobstep[$job]->{stderr} .= $buf;
1185 preprocess_stderr ($job);
1186 if (length ($jobstep[$job]->{stderr}) > 16384)
1188 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1197 sub preprocess_stderr
1201 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1203 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1204 Log ($job, "stderr $line");
1205 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1207 $main::please_freeze = 1;
1209 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1210 $jobstep[$job]->{node_fail} = 1;
1211 ban_node_by_slot($jobstep[$job]->{slotindex});
1220 my $task_success = shift;
1221 preprocess_stderr ($job);
1224 Log ($job, "stderr $_");
1225 } split ("\n", $jobstep[$job]->{stderr});
1231 my ($keep, $child_out, $output_block);
1233 my $cmd = "arv-get \Q$hash\E";
1234 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1238 my $bytes = sysread($keep, $buf, 1024 * 1024);
1239 if (!defined $bytes) {
1240 die "reading from arv-get: $!";
1241 } elsif ($bytes == 0) {
1242 # sysread returns 0 at the end of the pipe.
1245 # some bytes were read into buf.
1246 $output_block .= $buf;
1250 return $output_block;
1255 Log (undef, "collate");
1257 my ($child_out, $child_in);
1258 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1259 '--retries', put_retry_count());
1263 next if (!exists $_->{'arvados_task'}->{'output'} ||
1264 !$_->{'arvados_task'}->{'success'});
1265 my $output = $_->{'arvados_task'}->{output};
1266 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1268 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1269 print $child_in $output;
1271 elsif (@jobstep == 1)
1273 $joboutput = $output;
1276 elsif (defined (my $outblock = fetch_block ($output)))
1278 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1279 print $child_in $outblock;
1283 Log (undef, "XXX fetch_block($output) failed XXX");
1289 if (!defined $joboutput) {
1290 my $s = IO::Select->new($child_out);
1291 if ($s->can_read(120)) {
1292 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1294 # TODO: Ensure exit status == 0.
1296 Log (undef, "timed out reading from 'arv-put'");
1299 # TODO: kill $pid instead of waiting, now that we've decided to
1300 # ignore further output.
1311 my $sig = 2; # SIGINT first
1312 if (exists $proc{$_}->{"sent_$sig"} &&
1313 time - $proc{$_}->{"sent_$sig"} > 4)
1315 $sig = 15; # SIGTERM if SIGINT doesn't work
1317 if (exists $proc{$_}->{"sent_$sig"} &&
1318 time - $proc{$_}->{"sent_$sig"} > 4)
1320 $sig = 9; # SIGKILL if SIGTERM doesn't work
1322 if (!exists $proc{$_}->{"sent_$sig"})
1324 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1326 select (undef, undef, undef, 0.1);
1329 kill $sig, $_; # srun wants two SIGINT to really interrupt
1331 $proc{$_}->{"sent_$sig"} = time;
1332 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1342 vec($bits,fileno($_),1) = 1;
1348 # Send log output to Keep via arv-put.
1350 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1351 # $log_pipe_pid is the pid of the arv-put subprocess.
1353 # The only functions that should access these variables directly are:
1355 # log_writer_start($logfilename)
1356 # Starts an arv-put pipe, reading data on stdin and writing it to
1357 # a $logfilename file in an output collection.
1359 # log_writer_send($txt)
1360 # Writes $txt to the output log collection.
1362 # log_writer_finish()
1363 # Closes the arv-put pipe and returns the output that it produces.
1365 # log_writer_is_active()
1366 # Returns a true value if there is currently a live arv-put
1367 # process, false otherwise.
1369 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1371 sub log_writer_start($)
1373 my $logfilename = shift;
1374 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1375 'arv-put', '--portable-data-hash',
1377 '--filename', $logfilename,
1381 sub log_writer_send($)
1384 print $log_pipe_in $txt;
1387 sub log_writer_finish()
1389 return unless $log_pipe_pid;
1391 close($log_pipe_in);
1394 my $s = IO::Select->new($log_pipe_out);
1395 if ($s->can_read(120)) {
1396 sysread($log_pipe_out, $arv_put_output, 1024);
1397 chomp($arv_put_output);
1399 Log (undef, "timed out reading from 'arv-put'");
1402 waitpid($log_pipe_pid, 0);
1403 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1405 Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1408 return $arv_put_output;
1411 sub log_writer_is_active() {
1412 return $log_pipe_pid;
1415 sub Log # ($jobstep_id, $logmessage)
1417 if ($_[1] =~ /\n/) {
1418 for my $line (split (/\n/, $_[1])) {
1423 my $fh = select STDERR; $|=1; select $fh;
1424 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1425 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1428 if (log_writer_is_active() || -t STDERR) {
1429 my @gmtime = gmtime;
1430 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1431 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1433 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1435 if (log_writer_is_active()) {
1436 log_writer_send($datetime . " " . $message);
1443 my ($package, $file, $line) = caller;
1444 my $message = "@_ at $file line $line\n";
1445 Log (undef, $message);
1446 freeze() if @jobstep_todo;
1447 collate_output() if @jobstep_todo;
1457 if ($Job->{'state'} eq 'Cancelled') {
1458 $Job->update_attributes('finished_at' => scalar gmtime);
1460 $Job->update_attributes('state' => 'Failed');
1467 my $justcheckpoint = shift; # false if this will be the last meta saved
1468 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1469 return unless log_writer_is_active();
1471 my $loglocator = log_writer_finish();
1472 Log (undef, "log manifest is $loglocator");
1473 $Job->{'log'} = $loglocator;
1474 $Job->update_attributes('log', $loglocator);
1478 sub freeze_if_want_freeze
1480 if ($main::please_freeze)
1482 release_allocation();
1485 # kill some srun procs before freeze+stop
1486 map { $proc{$_} = {} } @_;
1489 killem (keys %proc);
1490 select (undef, undef, undef, 0.1);
1492 while (($died = waitpid (-1, WNOHANG)) > 0)
1494 delete $proc{$died};
1509 Log (undef, "Freeze not implemented");
1516 croak ("Thaw not implemented");
1532 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1539 my $srunargs = shift;
1540 my $execargs = shift;
1541 my $opts = shift || {};
1543 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1544 print STDERR (join (" ",
1545 map { / / ? "'$_'" : $_ }
1548 if $ENV{CRUNCH_DEBUG};
1550 if (defined $stdin) {
1551 my $child = open STDIN, "-|";
1552 defined $child or die "no fork: $!";
1554 print $stdin or die $!;
1555 close STDOUT or die $!;
1560 return system (@$args) if $opts->{fork};
1563 warn "ENV size is ".length(join(" ",%ENV));
1564 die "exec failed: $!: @$args";
1568 sub ban_node_by_slot {
1569 # Don't start any new jobsteps on this node for 60 seconds
1571 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1572 $slot[$slotid]->{node}->{hold_count}++;
1573 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1578 my ($lockfile, $error_message) = @_;
1579 open L, ">", $lockfile or croak("$lockfile: $!");
1580 if (!flock L, LOCK_EX|LOCK_NB) {
1581 croak("Can't lock $lockfile: $error_message\n");
1585 sub find_docker_image {
1586 # Given a Keep locator, check to see if it contains a Docker image.
1587 # If so, return its stream name and Docker hash.
1588 # If not, return undef for both values.
1589 my $locator = shift;
1590 my ($streamname, $filename);
1591 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1592 foreach my $line (split(/\n/, $image->{manifest_text})) {
1593 my @tokens = split(/\s+/, $line);
1595 $streamname = shift(@tokens);
1596 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1597 if (defined($filename)) {
1598 return (undef, undef); # More than one file in the Collection.
1600 $filename = (split(/:/, $filedata, 3))[2];
1605 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1606 return ($streamname, $1);
1608 return (undef, undef);
1612 sub put_retry_count {
1613 # Calculate a --retries argument for arv-put that will have it try
1614 # approximately as long as this Job has been running.
1615 my $stoptime = shift || time;
1616 my $starttime = $jobstep[0]->{starttime};
1617 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1619 while ($timediff >= 2) {
1623 return ($retries > 3) ? $retries : 3;
1627 # Given a $?, return a human-readable exit code string like "0" or
1628 # "1" or "0 with signal 1" or "1 with signal 11".
1629 my $exitcode = shift;
1630 my $s = $exitcode >> 8;
1631 if ($exitcode & 0x7f) {
1632 $s .= " with signal " . ($exitcode & 0x7f);
1634 if ($exitcode & 0x80) {
1635 $s .= " with core dump";
1643 # checkout-and-build
1646 use File::Path qw( make_path );
1648 my $destdir = $ENV{"CRUNCH_SRC"};
1649 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1650 my $repo = $ENV{"CRUNCH_SRC_URL"};
1651 my $task_work = $ENV{"TASK_WORK"};
1653 for my $dir ($destdir, $task_work) {
1656 -e $dir or die "Failed to create temporary directory ($dir): $!";
1660 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1662 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1665 die "Cannot exec `@ARGV`: $!";
1671 unlink "$destdir.commit";
1672 open STDOUT, ">", "$destdir.log";
1673 open STDERR, ">&STDOUT";
1676 my @git_archive_data = <DATA>;
1677 if (@git_archive_data) {
1678 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1679 print TARX @git_archive_data;
1681 die "'tar -C $destdir -xf -' exited $?: $!";
1686 chomp ($pwd = `pwd`);
1687 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1690 for my $src_path ("$destdir/arvados/sdk/python") {
1692 shell_or_die ("virtualenv", $install_dir);
1693 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1697 if (-e "$destdir/crunch_scripts/install") {
1698 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1699 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1701 shell_or_die ("./tests/autotests.sh", $install_dir);
1702 } elsif (-e "./install.sh") {
1703 shell_or_die ("./install.sh", $install_dir);
1707 unlink "$destdir.commit.new";
1708 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1709 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1716 die "Cannot exec `@ARGV`: $!";
1723 if ($ENV{"DEBUG"}) {
1724 print STDERR "@_\n";
1727 or die "@_ failed: $! exit 0x".sprintf("%x",$?);