2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path remove_tree );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
151 if ($jobspec =~ /^[-a-z\d]+$/)
153 # $jobspec is an Arvados UUID, not a JSON job specification
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'state'} ne 'Queued') {
165 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
168 if ($Job->{'success'} ne undef) {
169 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
172 if ($Job->{'running'}) {
173 Log(undef, "Job 'running' flag is already set");
176 if ($Job->{'started_at'}) {
177 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
184 $Job = JSON::decode_json($jobspec);
188 map { croak ("No $_ specified") unless $Job->{$_} }
189 qw(script script_version script_parameters);
192 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193 $Job->{'started_at'} = gmtime;
195 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
197 $job_id = $Job->{'uuid'};
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
207 Log (undef, "check slurm allocation");
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
214 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215 push @sinfo, "$localcpus localhost";
217 if (exists $ENV{SLURM_NODELIST})
219 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
223 my ($ncpus, $slurm_nodelist) = split;
224 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
227 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
230 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
233 foreach (split (",", $ranges))
246 push @nodelist, map {
248 $n =~ s/\[[-,\d]+\]/$_/;
255 push @nodelist, $nodelist;
258 foreach my $nodename (@nodelist)
260 Log (undef, "node $nodename - $ncpus slots");
261 my $node = { name => $nodename,
265 foreach my $cpu (1..$ncpus)
267 push @slot, { node => $node,
271 push @node, @nodelist;
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
290 $Job->update_attributes('state' => 'Running',
291 'tasks_summary' => { 'failed' => 0,
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
329 if (defined $Job->{thawedfromkey})
331 thaw ($Job->{thawedfromkey});
335 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336 'job_uuid' => $Job->{'uuid'},
341 push @jobstep, { 'level' => 0,
343 'arvados_task' => $first_task,
345 push @jobstep_todo, 0;
351 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
358 $build_script = <DATA>;
360 my $nodelist = join(",", @node);
362 if (!defined $no_clear_tmp) {
363 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
364 Log (undef, "Clean work dirs");
366 my $cleanpid = fork();
369 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
370 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
375 last if $cleanpid == waitpid (-1, WNOHANG);
376 freeze_if_want_freeze ($cleanpid);
377 select (undef, undef, undef, 0.1);
379 Log (undef, "Cleanup command exited $?");
384 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
385 # If we're in user-land (i.e., not called from crunch-dispatch)
386 # script_version can be an absolute directory path, signifying we
387 # should work straight out of that directory instead of using a git
389 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
390 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
393 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
395 # Install requested code version
396 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
398 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
400 # If we're running under crunch-dispatch, it will have already
401 # pulled the appropriate source tree into its own repository, and
402 # given us that repo's path as $git_dir.
404 # If we're running a "local" job, we might have to fetch content
405 # from a remote repository.
407 # (Currently crunch-dispatch gives a local path with --git-dir, but
408 # we might as well accept URLs there too in case it changes its
410 my $repo = $git_dir || $Job->{'repository'};
412 # Repository can be remote or local. If remote, we'll need to fetch it
413 # to a local dir before doing `git log` et al.
416 if ($repo =~ m{://|\@.*:}) {
417 # $repo is a git url we can clone, like git:// or https:// or
418 # file:/// or git@host:repo.git
419 $repo_location = 'remote';
420 } elsif ($repo =~ m{^\.*/}) {
421 # $repo is a local path to a git index. We'll also resolve ../foo
422 # to ../foo/.git if the latter is a directory.
423 if (-d "$repo/.git") {
424 $repo = "$repo/.git";
426 $repo_location = 'local';
427 Log(undef, "Using local repository '$repo'");
429 # $repo is none of the above. It must be the name of a hosted
431 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
432 'filters' => [['name','=',$repo]]
434 my $n_found = scalar @{$arv_repo_list};
436 Log(undef, "Repository '$repo' -> "
437 . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
440 croak("Error: Found $n_found repositories with name '$repo'.");
442 $repo = $arv_repo_list->[0]->{'fetch_url'};
443 $repo_location = 'remote';
445 Log(undef, "Using $repo_location repository '$repo'");
446 $ENV{"CRUNCH_SRC_URL"} = $repo;
448 # Resolve given script_version (we'll call that $treeish here) to a
449 # commit sha1 ($commit).
450 my $treeish = $Job->{'script_version'};
452 if ($repo_location eq 'remote') {
453 # We minimize excess object-fetching by re-using the same bare
454 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
455 # just keep adding remotes to it as needed.
456 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
457 my $gitcmd = "git --git-dir=\Q$local_repo\E";
459 # Set up our local repo for caching remote objects, making
461 if (!-d $local_repo) {
462 make_path($local_repo) or croak("Error: could not create $local_repo");
464 # This works (exits 0 and doesn't delete fetched objects) even
465 # if $local_repo is already initialized:
466 `$gitcmd init --bare`;
468 croak("Error: $gitcmd init --bare exited $?");
471 # If $treeish looks like a hash (or abbrev hash) we look it up in
472 # our local cache first, since that's cheaper. (We don't want to
473 # do that with tags/branches though -- those change over time, so
474 # they should always be resolved by the remote repo.)
475 if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
476 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
478 $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
479 $sha1 =~ /^([0-9a-f]{40})$/) {
481 Log(undef, "Commit $commit already present in $local_repo");
485 if (!defined $commit) {
486 # If $treeish isn't just a hash or abbrev hash, or isn't here
487 # yet, we need to fetch the remote to resolve it correctly.
489 # First, remove all local heads. This prevents a name that does
490 # not exist on the remote from resolving to (or colliding with)
491 # a previously fetched branch or tag (possibly from a different
493 remove_tree("$local_repo/refs/heads", {keep_root => 1});
495 Log(undef, "Fetching objects from $repo to $local_repo");
496 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
498 croak("Error: `$gitcmd fetch` exited $?");
502 # Now that the data is all here, we will use our local repo for
503 # the rest of our git activities.
507 my $gitcmd = "git --git-dir=\Q$repo\E";
508 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
509 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
510 croak("`$gitcmd rev-list` exited $?, '$treeish' not found. Giving up.");
513 Log(undef, "Version $treeish is commit $commit");
515 if ($commit ne $Job->{'script_version'}) {
516 # Record the real commit id in the database, frozentokey, logs,
517 # etc. -- instead of an abbreviation or a branch name which can
518 # become ambiguous or point to a different commit in the future.
519 if (!$Job->update_attributes('script_version' => $commit)) {
520 croak("Error: failed to update job's script_version attribute");
524 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
525 $git_archive = `$gitcmd archive ''\Q$commit\E`;
527 croak("Error: $gitcmd archive exited $?");
531 if (!defined $git_archive) {
532 Log(undef, "Skip install phase (no git archive)");
534 Log(undef, "Warning: This probably means workers have no source tree!");
538 Log(undef, "Run install script on all workers");
540 my @srunargs = ("srun",
541 "--nodelist=$nodelist",
542 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
543 my @execargs = ("sh", "-c",
544 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
546 # Note: this section is almost certainly unnecessary if we're
547 # running tasks in docker containers.
548 my $installpid = fork();
549 if ($installpid == 0)
551 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
556 last if $installpid == waitpid (-1, WNOHANG);
557 freeze_if_want_freeze ($installpid);
558 select (undef, undef, undef, 0.1);
560 Log (undef, "Install script exited $?");
565 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
566 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
569 # If this job requires a Docker image, install that.
570 my $docker_bin = "/usr/bin/docker.io";
571 my ($docker_locator, $docker_stream, $docker_hash);
572 if ($docker_locator = $Job->{docker_image_locator}) {
573 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
576 croak("No Docker image hash found from locator $docker_locator");
578 $docker_stream =~ s/^\.//;
579 my $docker_install_script = qq{
580 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
581 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
584 my $docker_pid = fork();
585 if ($docker_pid == 0)
587 srun (["srun", "--nodelist=" . join(',', @node)],
588 ["/bin/sh", "-ec", $docker_install_script]);
593 last if $docker_pid == waitpid (-1, WNOHANG);
594 freeze_if_want_freeze ($docker_pid);
595 select (undef, undef, undef, 0.1);
599 croak("Installing Docker image from $docker_locator returned exit code $?");
603 foreach (qw (script script_version script_parameters runtime_constraints))
607 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
609 foreach (split (/\n/, $Job->{knobs}))
611 Log (undef, "knob " . $_);
616 $main::success = undef;
622 my $thisround_succeeded = 0;
623 my $thisround_failed = 0;
624 my $thisround_failed_multiple = 0;
626 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
627 or $a <=> $b } @jobstep_todo;
628 my $level = $jobstep[$jobstep_todo[0]]->{level};
629 Log (undef, "start level $level");
634 my @freeslot = (0..$#slot);
637 my $progress_is_dirty = 1;
638 my $progress_stats_updated = 0;
640 update_progress_stats();
645 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
647 my $id = $jobstep_todo[$todo_ptr];
648 my $Jobstep = $jobstep[$id];
649 if ($Jobstep->{level} != $level)
654 pipe $reader{$id}, "writer" or croak ($!);
655 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
656 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
658 my $childslot = $freeslot[0];
659 my $childnode = $slot[$childslot]->{node};
660 my $childslotname = join (".",
661 $slot[$childslot]->{node}->{name},
662 $slot[$childslot]->{cpu});
663 my $childpid = fork();
666 $SIG{'INT'} = 'DEFAULT';
667 $SIG{'QUIT'} = 'DEFAULT';
668 $SIG{'TERM'} = 'DEFAULT';
670 foreach (values (%reader))
674 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
675 open(STDOUT,">&writer");
676 open(STDERR,">&writer");
681 delete $ENV{"GNUPGHOME"};
682 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
683 $ENV{"TASK_QSEQUENCE"} = $id;
684 $ENV{"TASK_SEQUENCE"} = $level;
685 $ENV{"JOB_SCRIPT"} = $Job->{script};
686 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
687 $param =~ tr/a-z/A-Z/;
688 $ENV{"JOB_PARAMETER_$param"} = $value;
690 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
691 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
692 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
693 $ENV{"HOME"} = $ENV{"TASK_WORK"};
694 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
695 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
696 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
697 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
703 "--nodelist=".$childnode->{name},
704 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
705 "--job-name=$job_id.$id.$$",
707 my $build_script_to_send = "";
709 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
710 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
711 ."&& cd $ENV{CRUNCH_TMP} ";
714 $build_script_to_send = $build_script;
718 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
721 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
722 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
723 # Dynamically configure the container to use the host system as its
724 # DNS server. Get the host's global addresses from the ip command,
725 # and turn them into docker --dns options using gawk.
727 q{$(ip -o address show scope global |
728 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
729 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
730 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
731 $command .= "--env=\QHOME=/home/crunch\E ";
732 while (my ($env_key, $env_val) = each %ENV)
734 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
735 if ($env_key eq "TASK_WORK") {
736 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
738 elsif ($env_key eq "TASK_KEEPMOUNT") {
739 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
742 $command .= "--env=\Q$env_key=$env_val\E ";
746 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
747 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
748 $command .= "\Q$docker_hash\E ";
749 $command .= "stdbuf --output=0 --error=0 ";
750 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
753 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
754 $command .= "stdbuf --output=0 --error=0 ";
755 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
758 my @execargs = ('bash', '-c', $command);
759 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
760 # exec() failed, we assume nothing happened.
761 Log(undef, "srun() failed on build script");
765 if (!defined $childpid)
772 $proc{$childpid} = { jobstep => $id,
775 jobstepname => "$job_id.$id.$childpid",
777 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
778 $slot[$childslot]->{pid} = $childpid;
780 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
781 Log ($id, "child $childpid started on $childslotname");
782 $Jobstep->{starttime} = time;
783 $Jobstep->{node} = $childnode->{name};
784 $Jobstep->{slotindex} = $childslot;
785 delete $Jobstep->{stderr};
786 delete $Jobstep->{finishtime};
788 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
789 $Jobstep->{'arvados_task'}->save;
791 splice @jobstep_todo, $todo_ptr, 1;
794 $progress_is_dirty = 1;
798 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
800 last THISROUND if $main::please_freeze;
801 if ($main::please_info)
803 $main::please_info = 0;
807 update_progress_stats();
814 check_refresh_wanted();
816 update_progress_stats();
817 select (undef, undef, undef, 0.1);
819 elsif (time - $progress_stats_updated >= 30)
821 update_progress_stats();
823 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
824 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
826 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
827 .($thisround_failed+$thisround_succeeded)
828 .") -- giving up on this round";
829 Log (undef, $message);
833 # move slots from freeslot to holdslot (or back to freeslot) if necessary
834 for (my $i=$#freeslot; $i>=0; $i--) {
835 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
836 push @holdslot, (splice @freeslot, $i, 1);
839 for (my $i=$#holdslot; $i>=0; $i--) {
840 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
841 push @freeslot, (splice @holdslot, $i, 1);
845 # give up if no nodes are succeeding
846 if (!grep { $_->{node}->{losing_streak} == 0 &&
847 $_->{node}->{hold_count} < 4 } @slot) {
848 my $message = "Every node has failed -- giving up on this round";
849 Log (undef, $message);
856 push @freeslot, splice @holdslot;
857 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
860 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
863 if ($main::please_continue) {
864 $main::please_continue = 0;
867 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
871 check_refresh_wanted();
873 update_progress_stats();
874 select (undef, undef, undef, 0.1);
875 killem (keys %proc) if $main::please_freeze;
879 update_progress_stats();
880 freeze_if_want_freeze();
883 if (!defined $main::success)
886 $thisround_succeeded == 0 &&
887 ($thisround_failed == 0 || $thisround_failed > 4))
889 my $message = "stop because $thisround_failed tasks failed and none succeeded";
890 Log (undef, $message);
899 goto ONELEVEL if !defined $main::success;
902 release_allocation();
904 my $collated_output = &collate_output();
906 if (!$collated_output) {
907 Log(undef, "output undef");
911 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
912 or die "failed to get collated manifest: $!";
913 my $orig_manifest_text = '';
914 while (my $manifest_line = <$orig_manifest>) {
915 $orig_manifest_text .= $manifest_line;
917 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
918 'manifest_text' => $orig_manifest_text,
920 Log(undef, "output uuid " . $output->{uuid});
921 Log(undef, "output hash " . $output->{portable_data_hash});
922 $Job->update_attributes('output' => $output->{portable_data_hash});
925 Log (undef, "Failed to register output manifest: $@");
929 Log (undef, "finish");
934 if ($collated_output && $main::success) {
935 $final_state = 'Complete';
937 $final_state = 'Failed';
939 $Job->update_attributes('state' => $final_state);
941 exit (($final_state eq 'Complete') ? 0 : 1);
945 sub update_progress_stats
947 $progress_stats_updated = time;
948 return if !$progress_is_dirty;
949 my ($todo, $done, $running) = (scalar @jobstep_todo,
950 scalar @jobstep_done,
951 scalar @slot - scalar @freeslot - scalar @holdslot);
952 $Job->{'tasks_summary'} ||= {};
953 $Job->{'tasks_summary'}->{'todo'} = $todo;
954 $Job->{'tasks_summary'}->{'done'} = $done;
955 $Job->{'tasks_summary'}->{'running'} = $running;
956 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
957 Log (undef, "status: $done done, $running running, $todo todo");
958 $progress_is_dirty = 0;
965 my $pid = waitpid (-1, WNOHANG);
966 return 0 if $pid <= 0;
968 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
970 . $slot[$proc{$pid}->{slot}]->{cpu});
971 my $jobstepid = $proc{$pid}->{jobstep};
972 my $elapsed = time - $proc{$pid}->{time};
973 my $Jobstep = $jobstep[$jobstepid];
975 my $childstatus = $?;
976 my $exitvalue = $childstatus >> 8;
977 my $exitinfo = sprintf("exit %d signal %d%s",
980 ($childstatus & 128 ? ' core dump' : ''));
981 $Jobstep->{'arvados_task'}->reload;
982 my $task_success = $Jobstep->{'arvados_task'}->{success};
984 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
986 if (!defined $task_success) {
987 # task did not indicate one way or the other --> fail
988 $Jobstep->{'arvados_task'}->{success} = 0;
989 $Jobstep->{'arvados_task'}->save;
996 $temporary_fail ||= $Jobstep->{node_fail};
997 $temporary_fail ||= ($exitvalue == 111);
1000 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1002 # Check for signs of a failed or misconfigured node
1003 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1004 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1005 # Don't count this against jobstep failure thresholds if this
1006 # node is already suspected faulty and srun exited quickly
1007 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1009 Log ($jobstepid, "blaming failure on suspect node " .
1010 $slot[$proc{$pid}->{slot}]->{node}->{name});
1011 $temporary_fail ||= 1;
1013 ban_node_by_slot($proc{$pid}->{slot});
1016 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1017 ++$Jobstep->{'failures'},
1018 $temporary_fail ? 'temporary ' : 'permanent',
1021 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1022 # Give up on this task, and the whole job
1024 $main::please_freeze = 1;
1026 # Put this task back on the todo queue
1027 push @jobstep_todo, $jobstepid;
1028 $Job->{'tasks_summary'}->{'failed'}++;
1032 ++$thisround_succeeded;
1033 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1034 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1035 push @jobstep_done, $jobstepid;
1036 Log ($jobstepid, "success in $elapsed seconds");
1038 $Jobstep->{exitcode} = $childstatus;
1039 $Jobstep->{finishtime} = time;
1040 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1041 $Jobstep->{'arvados_task'}->save;
1042 process_stderr ($jobstepid, $task_success);
1043 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1045 close $reader{$jobstepid};
1046 delete $reader{$jobstepid};
1047 delete $slot[$proc{$pid}->{slot}]->{pid};
1048 push @freeslot, $proc{$pid}->{slot};
1051 if ($task_success) {
1053 my $newtask_list = [];
1054 my $newtask_results;
1056 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1058 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1060 'order' => 'qsequence',
1061 'offset' => scalar(@$newtask_list),
1063 push(@$newtask_list, @{$newtask_results->{items}});
1064 } while (@{$newtask_results->{items}});
1065 foreach my $arvados_task (@$newtask_list) {
1067 'level' => $arvados_task->{'sequence'},
1069 'arvados_task' => $arvados_task
1071 push @jobstep, $jobstep;
1072 push @jobstep_todo, $#jobstep;
1076 $progress_is_dirty = 1;
1080 sub check_refresh_wanted
1082 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1083 if (@stat && $stat[9] > $latest_refresh) {
1084 $latest_refresh = scalar time;
1085 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1086 for my $attr ('cancelled_at',
1087 'cancelled_by_user_uuid',
1088 'cancelled_by_client_uuid',
1090 $Job->{$attr} = $Job2->{$attr};
1092 if ($Job->{'state'} ne "Running") {
1093 if ($Job->{'state'} eq "Cancelled") {
1094 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1096 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1099 $main::please_freeze = 1;
1106 # return if the kill list was checked <4 seconds ago
1107 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1111 $squeue_kill_checked = time;
1113 # use killem() on procs whose killtime is reached
1116 if (exists $proc{$_}->{killtime}
1117 && $proc{$_}->{killtime} <= time)
1123 # return if the squeue was checked <60 seconds ago
1124 if (defined $squeue_checked && $squeue_checked > time - 60)
1128 $squeue_checked = time;
1132 # here is an opportunity to check for mysterious problems with local procs
1136 # get a list of steps still running
1137 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1139 if ($squeue[-1] ne "ok")
1145 # which of my jobsteps are running, according to squeue?
1149 if (/^(\d+)\.(\d+) (\S+)/)
1151 if ($1 eq $ENV{SLURM_JOBID})
1158 # which of my active child procs (>60s old) were not mentioned by squeue?
1159 foreach (keys %proc)
1161 if ($proc{$_}->{time} < time - 60
1162 && !exists $ok{$proc{$_}->{jobstepname}}
1163 && !exists $proc{$_}->{killtime})
1165 # kill this proc if it hasn't exited in 30 seconds
1166 $proc{$_}->{killtime} = time + 30;
1172 sub release_allocation
1176 Log (undef, "release job allocation");
1177 system "scancel $ENV{SLURM_JOBID}";
1185 foreach my $job (keys %reader)
1188 while (0 < sysread ($reader{$job}, $buf, 8192))
1190 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1191 $jobstep[$job]->{stderr} .= $buf;
1192 preprocess_stderr ($job);
1193 if (length ($jobstep[$job]->{stderr}) > 16384)
1195 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1204 sub preprocess_stderr
1208 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1210 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1211 Log ($job, "stderr $line");
1212 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1214 $main::please_freeze = 1;
1216 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1217 $jobstep[$job]->{node_fail} = 1;
1218 ban_node_by_slot($jobstep[$job]->{slotindex});
1227 my $task_success = shift;
1228 preprocess_stderr ($job);
1231 Log ($job, "stderr $_");
1232 } split ("\n", $jobstep[$job]->{stderr});
1238 my ($keep, $child_out, $output_block);
1240 my $cmd = "arv-get \Q$hash\E";
1241 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1245 my $bytes = sysread($keep, $buf, 1024 * 1024);
1246 if (!defined $bytes) {
1247 die "reading from arv-get: $!";
1248 } elsif ($bytes == 0) {
1249 # sysread returns 0 at the end of the pipe.
1252 # some bytes were read into buf.
1253 $output_block .= $buf;
1257 return $output_block;
1262 Log (undef, "collate");
1264 my ($child_out, $child_in);
1265 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1266 '--retries', put_retry_count());
1270 next if (!exists $_->{'arvados_task'}->{'output'} ||
1271 !$_->{'arvados_task'}->{'success'});
1272 my $output = $_->{'arvados_task'}->{output};
1273 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1275 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1276 print $child_in $output;
1278 elsif (@jobstep == 1)
1280 $joboutput = $output;
1283 elsif (defined (my $outblock = fetch_block ($output)))
1285 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1286 print $child_in $outblock;
1290 Log (undef, "XXX fetch_block($output) failed XXX");
1296 if (!defined $joboutput) {
1297 my $s = IO::Select->new($child_out);
1298 if ($s->can_read(120)) {
1299 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1301 # TODO: Ensure exit status == 0.
1303 Log (undef, "timed out reading from 'arv-put'");
1306 # TODO: kill $pid instead of waiting, now that we've decided to
1307 # ignore further output.
1318 my $sig = 2; # SIGINT first
1319 if (exists $proc{$_}->{"sent_$sig"} &&
1320 time - $proc{$_}->{"sent_$sig"} > 4)
1322 $sig = 15; # SIGTERM if SIGINT doesn't work
1324 if (exists $proc{$_}->{"sent_$sig"} &&
1325 time - $proc{$_}->{"sent_$sig"} > 4)
1327 $sig = 9; # SIGKILL if SIGTERM doesn't work
1329 if (!exists $proc{$_}->{"sent_$sig"})
1331 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1333 select (undef, undef, undef, 0.1);
1336 kill $sig, $_; # srun wants two SIGINT to really interrupt
1338 $proc{$_}->{"sent_$sig"} = time;
1339 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1349 vec($bits,fileno($_),1) = 1;
1355 sub Log # ($jobstep_id, $logmessage)
1357 if ($_[1] =~ /\n/) {
1358 for my $line (split (/\n/, $_[1])) {
1363 my $fh = select STDERR; $|=1; select $fh;
1364 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1365 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1368 if ($local_logfile || -t STDERR) {
1369 my @gmtime = gmtime;
1370 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1371 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1373 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1375 if ($local_logfile) {
1376 print $local_logfile $datetime . " " . $message;
1383 my ($package, $file, $line) = caller;
1384 my $message = "@_ at $file line $line\n";
1385 Log (undef, $message);
1386 freeze() if @jobstep_todo;
1387 collate_output() if @jobstep_todo;
1389 save_meta() if $local_logfile;
1396 if ($Job->{'state'} eq 'Cancelled') {
1397 $Job->update_attributes('finished_at' => scalar gmtime);
1399 $Job->update_attributes('state' => 'Failed');
1406 my $justcheckpoint = shift; # false if this will be the last meta saved
1407 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1409 $local_logfile->flush;
1410 my $retry_count = put_retry_count();
1411 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1412 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1413 my $loglocator = `$cmd`;
1414 die "system $cmd failed: $?" if $?;
1417 $local_logfile = undef; # the temp file is automatically deleted
1418 Log (undef, "log manifest is $loglocator");
1419 $Job->{'log'} = $loglocator;
1420 $Job->update_attributes('log', $loglocator);
1424 sub freeze_if_want_freeze
1426 if ($main::please_freeze)
1428 release_allocation();
1431 # kill some srun procs before freeze+stop
1432 map { $proc{$_} = {} } @_;
1435 killem (keys %proc);
1436 select (undef, undef, undef, 0.1);
1438 while (($died = waitpid (-1, WNOHANG)) > 0)
1440 delete $proc{$died};
1455 Log (undef, "Freeze not implemented");
1462 croak ("Thaw not implemented");
1478 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1485 my $srunargs = shift;
1486 my $execargs = shift;
1487 my $opts = shift || {};
1489 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1490 print STDERR (join (" ",
1491 map { / / ? "'$_'" : $_ }
1494 if $ENV{CRUNCH_DEBUG};
1496 if (defined $stdin) {
1497 my $child = open STDIN, "-|";
1498 defined $child or die "no fork: $!";
1500 print $stdin or die $!;
1501 close STDOUT or die $!;
1506 return system (@$args) if $opts->{fork};
1509 warn "ENV size is ".length(join(" ",%ENV));
1510 die "exec failed: $!: @$args";
1514 sub ban_node_by_slot {
1515 # Don't start any new jobsteps on this node for 60 seconds
1517 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1518 $slot[$slotid]->{node}->{hold_count}++;
1519 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1524 my ($lockfile, $error_message) = @_;
1525 open L, ">", $lockfile or croak("$lockfile: $!");
1526 if (!flock L, LOCK_EX|LOCK_NB) {
1527 croak("Can't lock $lockfile: $error_message\n");
1531 sub find_docker_image {
1532 # Given a Keep locator, check to see if it contains a Docker image.
1533 # If so, return its stream name and Docker hash.
1534 # If not, return undef for both values.
1535 my $locator = shift;
1536 my ($streamname, $filename);
1537 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1538 foreach my $line (split(/\n/, $image->{manifest_text})) {
1539 my @tokens = split(/\s+/, $line);
1541 $streamname = shift(@tokens);
1542 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1543 if (defined($filename)) {
1544 return (undef, undef); # More than one file in the Collection.
1546 $filename = (split(/:/, $filedata, 3))[2];
1551 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1552 return ($streamname, $1);
1554 return (undef, undef);
1558 sub put_retry_count {
1559 # Calculate a --retries argument for arv-put that will have it try
1560 # approximately as long as this Job has been running.
1561 my $stoptime = shift || time;
1562 my $starttime = $jobstep[0]->{starttime};
1563 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1565 while ($timediff >= 2) {
1569 return ($retries > 3) ? $retries : 3;
1575 # checkout-and-build
1578 use File::Path qw( make_path );
1580 my $destdir = $ENV{"CRUNCH_SRC"};
1581 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1582 my $repo = $ENV{"CRUNCH_SRC_URL"};
1583 my $task_work = $ENV{"TASK_WORK"};
1585 for my $dir ($destdir, $task_work) {
1588 -e $dir or die "Failed to create temporary directory ($dir): $!";
1592 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1594 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1597 die "Cannot exec `@ARGV`: $!";
1603 unlink "$destdir.commit";
1604 open STDOUT, ">", "$destdir.log";
1605 open STDERR, ">&STDOUT";
1608 my @git_archive_data = <DATA>;
1609 if (@git_archive_data) {
1610 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1611 print TARX @git_archive_data;
1613 die "'tar -C $destdir -xf -' exited $?: $!";
1618 chomp ($pwd = `pwd`);
1619 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1622 for my $src_path ("$destdir/arvados/sdk/python") {
1624 shell_or_die ("virtualenv", $install_dir);
1625 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1629 if (-e "$destdir/crunch_scripts/install") {
1630 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1631 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1633 shell_or_die ("./tests/autotests.sh", $install_dir);
1634 } elsif (-e "./install.sh") {
1635 shell_or_die ("./install.sh", $install_dir);
1639 unlink "$destdir.commit.new";
1640 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1641 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1648 die "Cannot exec `@ARGV`: $!";
1655 if ($ENV{"DEBUG"}) {
1656 print STDERR "@_\n";
1659 or die "@_ failed: $! exit 0x".sprintf("%x",$?);