2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path remove_tree );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
133 $main::ENV{CRUNCH_DEBUG} = 1;
137 $main::ENV{CRUNCH_DEBUG} = 0;
142 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
151 if ($jobspec =~ /^[-a-z\d]+$/)
153 # $jobspec is an Arvados UUID, not a JSON job specification
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'state'} ne 'Queued') {
165 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
168 if ($Job->{'success'} ne undef) {
169 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
172 if ($Job->{'running'}) {
173 Log(undef, "Job 'running' flag is already set");
176 if ($Job->{'started_at'}) {
177 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
184 $Job = JSON::decode_json($jobspec);
188 map { croak ("No $_ specified") unless $Job->{$_} }
189 qw(script script_version script_parameters);
192 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193 $Job->{'started_at'} = gmtime;
195 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
197 $job_id = $Job->{'uuid'};
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
207 Log (undef, "check slurm allocation");
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
214 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215 push @sinfo, "$localcpus localhost";
217 if (exists $ENV{SLURM_NODELIST})
219 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
223 my ($ncpus, $slurm_nodelist) = split;
224 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
227 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
230 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
233 foreach (split (",", $ranges))
246 push @nodelist, map {
248 $n =~ s/\[[-,\d]+\]/$_/;
255 push @nodelist, $nodelist;
258 foreach my $nodename (@nodelist)
260 Log (undef, "node $nodename - $ncpus slots");
261 my $node = { name => $nodename,
265 foreach my $cpu (1..$ncpus)
267 push @slot, { node => $node,
271 push @node, @nodelist;
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
290 $Job->update_attributes('state' => 'Running',
291 'tasks_summary' => { 'failed' => 0,
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
329 if (defined $Job->{thawedfromkey})
331 thaw ($Job->{thawedfromkey});
335 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336 'job_uuid' => $Job->{'uuid'},
341 push @jobstep, { 'level' => 0,
343 'arvados_task' => $first_task,
345 push @jobstep_todo, 0;
351 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
358 $build_script = <DATA>;
360 my $nodelist = join(",", @node);
362 if (!defined $no_clear_tmp) {
363 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
364 Log (undef, "Clean work dirs");
366 my $cleanpid = fork();
369 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
370 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
375 last if $cleanpid == waitpid (-1, WNOHANG);
376 freeze_if_want_freeze ($cleanpid);
377 select (undef, undef, undef, 0.1);
379 Log (undef, "Cleanup command exited ".exit_status_s($?));
384 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
385 # If we're in user-land (i.e., not called from crunch-dispatch)
386 # script_version can be an absolute directory path, signifying we
387 # should work straight out of that directory instead of using a git
389 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
390 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
393 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
395 # Install requested code version
396 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
398 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
400 # If we're running under crunch-dispatch, it will have already
401 # pulled the appropriate source tree into its own repository, and
402 # given us that repo's path as $git_dir.
404 # If we're running a "local" job, we might have to fetch content
405 # from a remote repository.
407 # (Currently crunch-dispatch gives a local path with --git-dir, but
408 # we might as well accept URLs there too in case it changes its
410 my $repo = $git_dir || $Job->{'repository'};
412 # Repository can be remote or local. If remote, we'll need to fetch it
413 # to a local dir before doing `git log` et al.
416 if ($repo =~ m{://|\@.*:}) {
417 # $repo is a git url we can clone, like git:// or https:// or
418 # file:/// or git@host:repo.git
419 $repo_location = 'remote';
420 } elsif ($repo =~ m{^\.*/}) {
421 # $repo is a local path to a git index. We'll also resolve ../foo
422 # to ../foo/.git if the latter is a directory.
423 if (-d "$repo/.git") {
424 $repo = "$repo/.git";
426 $repo_location = 'local';
427 Log(undef, "Using local repository '$repo'");
429 # $repo is none of the above. It must be the name of a hosted
431 my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
432 'filters' => [['name','=',$repo]]
434 my $n_found = scalar @{$arv_repo_list};
436 Log(undef, "Repository '$repo' -> "
437 . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
440 croak("Error: Found $n_found repositories with name '$repo'.");
442 $repo = $arv_repo_list->[0]->{'fetch_url'};
443 $repo_location = 'remote';
445 Log(undef, "Using $repo_location repository '$repo'");
446 $ENV{"CRUNCH_SRC_URL"} = $repo;
448 # Resolve given script_version (we'll call that $treeish here) to a
449 # commit sha1 ($commit).
450 my $treeish = $Job->{'script_version'};
452 if ($repo_location eq 'remote') {
453 # We minimize excess object-fetching by re-using the same bare
454 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
455 # just keep adding remotes to it as needed.
456 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
457 my $gitcmd = "git --git-dir=\Q$local_repo\E";
459 # Set up our local repo for caching remote objects, making
461 if (!-d $local_repo) {
462 make_path($local_repo) or croak("Error: could not create $local_repo");
464 # This works (exits 0 and doesn't delete fetched objects) even
465 # if $local_repo is already initialized:
466 `$gitcmd init --bare`;
468 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
471 # If $treeish looks like a hash (or abbrev hash) we look it up in
472 # our local cache first, since that's cheaper. (We don't want to
473 # do that with tags/branches though -- those change over time, so
474 # they should always be resolved by the remote repo.)
475 if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
476 # Hide stderr because it's normal for this to fail:
477 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
479 $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
480 $sha1 =~ /^([0-9a-f]{40})$/s) {
482 Log(undef, "Commit $commit already present in $local_repo");
486 if (!defined $commit) {
487 # If $treeish isn't just a hash or abbrev hash, or isn't here
488 # yet, we need to fetch the remote to resolve it correctly.
490 # First, remove all local heads. This prevents a name that does
491 # not exist on the remote from resolving to (or colliding with)
492 # a previously fetched branch or tag (possibly from a different
494 remove_tree("$local_repo/refs/heads", {keep_root => 1});
496 Log(undef, "Fetching objects from $repo to $local_repo");
497 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
499 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
503 # Now that the data is all here, we will use our local repo for
504 # the rest of our git activities.
508 my $gitcmd = "git --git-dir=\Q$repo\E";
509 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
510 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
511 croak("`$gitcmd rev-list` exited "
513 .", '$treeish' not found. Giving up.");
516 Log(undef, "Version $treeish is commit $commit");
518 if ($commit ne $Job->{'script_version'}) {
519 # Record the real commit id in the database, frozentokey, logs,
520 # etc. -- instead of an abbreviation or a branch name which can
521 # become ambiguous or point to a different commit in the future.
522 if (!$Job->update_attributes('script_version' => $commit)) {
523 croak("Error: failed to update job's script_version attribute");
527 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
528 $git_archive = `$gitcmd archive ''\Q$commit\E`;
530 croak("Error: $gitcmd archive exited ".exit_status_s($?));
534 if (!defined $git_archive) {
535 Log(undef, "Skip install phase (no git archive)");
537 Log(undef, "Warning: This probably means workers have no source tree!");
541 Log(undef, "Run install script on all workers");
543 my @srunargs = ("srun",
544 "--nodelist=$nodelist",
545 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
546 my @execargs = ("sh", "-c",
547 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
549 # Note: this section is almost certainly unnecessary if we're
550 # running tasks in docker containers.
551 my $installpid = fork();
552 if ($installpid == 0)
554 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
559 last if $installpid == waitpid (-1, WNOHANG);
560 freeze_if_want_freeze ($installpid);
561 select (undef, undef, undef, 0.1);
563 Log (undef, "Install script exited ".exit_status_s($?));
568 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
569 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
572 # If this job requires a Docker image, install that.
573 my $docker_bin = "/usr/bin/docker.io";
574 my ($docker_locator, $docker_stream, $docker_hash);
575 if ($docker_locator = $Job->{docker_image_locator}) {
576 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
579 croak("No Docker image hash found from locator $docker_locator");
581 $docker_stream =~ s/^\.//;
582 my $docker_install_script = qq{
583 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
584 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
587 my $docker_pid = fork();
588 if ($docker_pid == 0)
590 srun (["srun", "--nodelist=" . join(',', @node)],
591 ["/bin/sh", "-ec", $docker_install_script]);
596 last if $docker_pid == waitpid (-1, WNOHANG);
597 freeze_if_want_freeze ($docker_pid);
598 select (undef, undef, undef, 0.1);
602 croak("Installing Docker image from $docker_locator exited "
607 foreach (qw (script script_version script_parameters runtime_constraints))
611 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
613 foreach (split (/\n/, $Job->{knobs}))
615 Log (undef, "knob " . $_);
620 $main::success = undef;
626 my $thisround_succeeded = 0;
627 my $thisround_failed = 0;
628 my $thisround_failed_multiple = 0;
630 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
631 or $a <=> $b } @jobstep_todo;
632 my $level = $jobstep[$jobstep_todo[0]]->{level};
633 Log (undef, "start level $level");
638 my @freeslot = (0..$#slot);
641 my $progress_is_dirty = 1;
642 my $progress_stats_updated = 0;
644 update_progress_stats();
649 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
651 my $id = $jobstep_todo[$todo_ptr];
652 my $Jobstep = $jobstep[$id];
653 if ($Jobstep->{level} != $level)
658 pipe $reader{$id}, "writer" or croak ($!);
659 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
660 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
662 my $childslot = $freeslot[0];
663 my $childnode = $slot[$childslot]->{node};
664 my $childslotname = join (".",
665 $slot[$childslot]->{node}->{name},
666 $slot[$childslot]->{cpu});
667 my $childpid = fork();
670 $SIG{'INT'} = 'DEFAULT';
671 $SIG{'QUIT'} = 'DEFAULT';
672 $SIG{'TERM'} = 'DEFAULT';
674 foreach (values (%reader))
678 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
679 open(STDOUT,">&writer");
680 open(STDERR,">&writer");
685 delete $ENV{"GNUPGHOME"};
686 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
687 $ENV{"TASK_QSEQUENCE"} = $id;
688 $ENV{"TASK_SEQUENCE"} = $level;
689 $ENV{"JOB_SCRIPT"} = $Job->{script};
690 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
691 $param =~ tr/a-z/A-Z/;
692 $ENV{"JOB_PARAMETER_$param"} = $value;
694 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
695 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
696 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
697 $ENV{"HOME"} = $ENV{"TASK_WORK"};
698 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
699 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
700 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
701 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
707 "--nodelist=".$childnode->{name},
708 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
709 "--job-name=$job_id.$id.$$",
711 my $build_script_to_send = "";
713 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
714 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
715 ."&& cd $ENV{CRUNCH_TMP} ";
718 $build_script_to_send = $build_script;
722 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
725 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
726 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
727 # Dynamically configure the container to use the host system as its
728 # DNS server. Get the host's global addresses from the ip command,
729 # and turn them into docker --dns options using gawk.
731 q{$(ip -o address show scope global |
732 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
733 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
734 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
735 $command .= "--env=\QHOME=/home/crunch\E ";
736 while (my ($env_key, $env_val) = each %ENV)
738 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
739 if ($env_key eq "TASK_WORK") {
740 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
742 elsif ($env_key eq "TASK_KEEPMOUNT") {
743 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
746 $command .= "--env=\Q$env_key=$env_val\E ";
750 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
751 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
752 $command .= "\Q$docker_hash\E ";
753 $command .= "stdbuf --output=0 --error=0 ";
754 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
757 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
758 $command .= "stdbuf --output=0 --error=0 ";
759 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
762 my @execargs = ('bash', '-c', $command);
763 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
764 # exec() failed, we assume nothing happened.
765 Log(undef, "srun() failed on build script");
769 if (!defined $childpid)
776 $proc{$childpid} = { jobstep => $id,
779 jobstepname => "$job_id.$id.$childpid",
781 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
782 $slot[$childslot]->{pid} = $childpid;
784 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
785 Log ($id, "child $childpid started on $childslotname");
786 $Jobstep->{starttime} = time;
787 $Jobstep->{node} = $childnode->{name};
788 $Jobstep->{slotindex} = $childslot;
789 delete $Jobstep->{stderr};
790 delete $Jobstep->{finishtime};
792 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
793 $Jobstep->{'arvados_task'}->save;
795 splice @jobstep_todo, $todo_ptr, 1;
798 $progress_is_dirty = 1;
802 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
804 last THISROUND if $main::please_freeze;
805 if ($main::please_info)
807 $main::please_info = 0;
811 update_progress_stats();
818 check_refresh_wanted();
820 update_progress_stats();
821 select (undef, undef, undef, 0.1);
823 elsif (time - $progress_stats_updated >= 30)
825 update_progress_stats();
827 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
828 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
830 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
831 .($thisround_failed+$thisround_succeeded)
832 .") -- giving up on this round";
833 Log (undef, $message);
837 # move slots from freeslot to holdslot (or back to freeslot) if necessary
838 for (my $i=$#freeslot; $i>=0; $i--) {
839 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
840 push @holdslot, (splice @freeslot, $i, 1);
843 for (my $i=$#holdslot; $i>=0; $i--) {
844 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
845 push @freeslot, (splice @holdslot, $i, 1);
849 # give up if no nodes are succeeding
850 if (!grep { $_->{node}->{losing_streak} == 0 &&
851 $_->{node}->{hold_count} < 4 } @slot) {
852 my $message = "Every node has failed -- giving up on this round";
853 Log (undef, $message);
860 push @freeslot, splice @holdslot;
861 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
864 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
867 if ($main::please_continue) {
868 $main::please_continue = 0;
871 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
875 check_refresh_wanted();
877 update_progress_stats();
878 select (undef, undef, undef, 0.1);
879 killem (keys %proc) if $main::please_freeze;
883 update_progress_stats();
884 freeze_if_want_freeze();
887 if (!defined $main::success)
890 $thisround_succeeded == 0 &&
891 ($thisround_failed == 0 || $thisround_failed > 4))
893 my $message = "stop because $thisround_failed tasks failed and none succeeded";
894 Log (undef, $message);
903 goto ONELEVEL if !defined $main::success;
906 release_allocation();
908 my $collated_output = &collate_output();
910 if (!$collated_output) {
911 Log(undef, "output undef");
915 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
916 or die "failed to get collated manifest: $!";
917 my $orig_manifest_text = '';
918 while (my $manifest_line = <$orig_manifest>) {
919 $orig_manifest_text .= $manifest_line;
921 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
922 'manifest_text' => $orig_manifest_text,
924 Log(undef, "output uuid " . $output->{uuid});
925 Log(undef, "output hash " . $output->{portable_data_hash});
926 $Job->update_attributes('output' => $output->{portable_data_hash});
929 Log (undef, "Failed to register output manifest: $@");
933 Log (undef, "finish");
938 if ($collated_output && $main::success) {
939 $final_state = 'Complete';
941 $final_state = 'Failed';
943 $Job->update_attributes('state' => $final_state);
945 exit (($final_state eq 'Complete') ? 0 : 1);
949 sub update_progress_stats
951 $progress_stats_updated = time;
952 return if !$progress_is_dirty;
953 my ($todo, $done, $running) = (scalar @jobstep_todo,
954 scalar @jobstep_done,
955 scalar @slot - scalar @freeslot - scalar @holdslot);
956 $Job->{'tasks_summary'} ||= {};
957 $Job->{'tasks_summary'}->{'todo'} = $todo;
958 $Job->{'tasks_summary'}->{'done'} = $done;
959 $Job->{'tasks_summary'}->{'running'} = $running;
960 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
961 Log (undef, "status: $done done, $running running, $todo todo");
962 $progress_is_dirty = 0;
969 my $pid = waitpid (-1, WNOHANG);
970 return 0 if $pid <= 0;
972 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
974 . $slot[$proc{$pid}->{slot}]->{cpu});
975 my $jobstepid = $proc{$pid}->{jobstep};
976 my $elapsed = time - $proc{$pid}->{time};
977 my $Jobstep = $jobstep[$jobstepid];
979 my $childstatus = $?;
980 my $exitvalue = $childstatus >> 8;
981 my $exitinfo = "exit ".exit_status_s($childstatus);
982 $Jobstep->{'arvados_task'}->reload;
983 my $task_success = $Jobstep->{'arvados_task'}->{success};
985 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
987 if (!defined $task_success) {
988 # task did not indicate one way or the other --> fail
989 $Jobstep->{'arvados_task'}->{success} = 0;
990 $Jobstep->{'arvados_task'}->save;
997 $temporary_fail ||= $Jobstep->{node_fail};
998 $temporary_fail ||= ($exitvalue == 111);
1000 ++$thisround_failed;
1001 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1003 # Check for signs of a failed or misconfigured node
1004 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1005 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1006 # Don't count this against jobstep failure thresholds if this
1007 # node is already suspected faulty and srun exited quickly
1008 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1010 Log ($jobstepid, "blaming failure on suspect node " .
1011 $slot[$proc{$pid}->{slot}]->{node}->{name});
1012 $temporary_fail ||= 1;
1014 ban_node_by_slot($proc{$pid}->{slot});
1017 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1018 ++$Jobstep->{'failures'},
1019 $temporary_fail ? 'temporary ' : 'permanent',
1022 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1023 # Give up on this task, and the whole job
1025 $main::please_freeze = 1;
1027 # Put this task back on the todo queue
1028 push @jobstep_todo, $jobstepid;
1029 $Job->{'tasks_summary'}->{'failed'}++;
1033 ++$thisround_succeeded;
1034 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1035 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1036 push @jobstep_done, $jobstepid;
1037 Log ($jobstepid, "success in $elapsed seconds");
1039 $Jobstep->{exitcode} = $childstatus;
1040 $Jobstep->{finishtime} = time;
1041 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1042 $Jobstep->{'arvados_task'}->save;
1043 process_stderr ($jobstepid, $task_success);
1044 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1046 close $reader{$jobstepid};
1047 delete $reader{$jobstepid};
1048 delete $slot[$proc{$pid}->{slot}]->{pid};
1049 push @freeslot, $proc{$pid}->{slot};
1052 if ($task_success) {
1054 my $newtask_list = [];
1055 my $newtask_results;
1057 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1059 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1061 'order' => 'qsequence',
1062 'offset' => scalar(@$newtask_list),
1064 push(@$newtask_list, @{$newtask_results->{items}});
1065 } while (@{$newtask_results->{items}});
1066 foreach my $arvados_task (@$newtask_list) {
1068 'level' => $arvados_task->{'sequence'},
1070 'arvados_task' => $arvados_task
1072 push @jobstep, $jobstep;
1073 push @jobstep_todo, $#jobstep;
1077 $progress_is_dirty = 1;
1081 sub check_refresh_wanted
1083 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1084 if (@stat && $stat[9] > $latest_refresh) {
1085 $latest_refresh = scalar time;
1086 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1087 for my $attr ('cancelled_at',
1088 'cancelled_by_user_uuid',
1089 'cancelled_by_client_uuid',
1091 $Job->{$attr} = $Job2->{$attr};
1093 if ($Job->{'state'} ne "Running") {
1094 if ($Job->{'state'} eq "Cancelled") {
1095 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1097 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1100 $main::please_freeze = 1;
1107 # return if the kill list was checked <4 seconds ago
1108 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1112 $squeue_kill_checked = time;
1114 # use killem() on procs whose killtime is reached
1117 if (exists $proc{$_}->{killtime}
1118 && $proc{$_}->{killtime} <= time)
1124 # return if the squeue was checked <60 seconds ago
1125 if (defined $squeue_checked && $squeue_checked > time - 60)
1129 $squeue_checked = time;
1133 # here is an opportunity to check for mysterious problems with local procs
1137 # get a list of steps still running
1138 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1140 if ($squeue[-1] ne "ok")
1146 # which of my jobsteps are running, according to squeue?
1150 if (/^(\d+)\.(\d+) (\S+)/)
1152 if ($1 eq $ENV{SLURM_JOBID})
1159 # which of my active child procs (>60s old) were not mentioned by squeue?
1160 foreach (keys %proc)
1162 if ($proc{$_}->{time} < time - 60
1163 && !exists $ok{$proc{$_}->{jobstepname}}
1164 && !exists $proc{$_}->{killtime})
1166 # kill this proc if it hasn't exited in 30 seconds
1167 $proc{$_}->{killtime} = time + 30;
1173 sub release_allocation
1177 Log (undef, "release job allocation");
1178 system "scancel $ENV{SLURM_JOBID}";
1186 foreach my $job (keys %reader)
1189 while (0 < sysread ($reader{$job}, $buf, 8192))
1191 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1192 $jobstep[$job]->{stderr} .= $buf;
1193 preprocess_stderr ($job);
1194 if (length ($jobstep[$job]->{stderr}) > 16384)
1196 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1205 sub preprocess_stderr
1209 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1211 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1212 Log ($job, "stderr $line");
1213 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1215 $main::please_freeze = 1;
1217 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1218 $jobstep[$job]->{node_fail} = 1;
1219 ban_node_by_slot($jobstep[$job]->{slotindex});
1228 my $task_success = shift;
1229 preprocess_stderr ($job);
1232 Log ($job, "stderr $_");
1233 } split ("\n", $jobstep[$job]->{stderr});
1239 my ($keep, $child_out, $output_block);
1241 my $cmd = "arv-get \Q$hash\E";
1242 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1246 my $bytes = sysread($keep, $buf, 1024 * 1024);
1247 if (!defined $bytes) {
1248 die "reading from arv-get: $!";
1249 } elsif ($bytes == 0) {
1250 # sysread returns 0 at the end of the pipe.
1253 # some bytes were read into buf.
1254 $output_block .= $buf;
1258 return $output_block;
1263 Log (undef, "collate");
1265 my ($child_out, $child_in);
1266 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1267 '--retries', put_retry_count());
1271 next if (!exists $_->{'arvados_task'}->{'output'} ||
1272 !$_->{'arvados_task'}->{'success'});
1273 my $output = $_->{'arvados_task'}->{output};
1274 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1276 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1277 print $child_in $output;
1279 elsif (@jobstep == 1)
1281 $joboutput = $output;
1284 elsif (defined (my $outblock = fetch_block ($output)))
1286 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1287 print $child_in $outblock;
1291 Log (undef, "XXX fetch_block($output) failed XXX");
1297 if (!defined $joboutput) {
1298 my $s = IO::Select->new($child_out);
1299 if ($s->can_read(120)) {
1300 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1302 # TODO: Ensure exit status == 0.
1304 Log (undef, "timed out reading from 'arv-put'");
1307 # TODO: kill $pid instead of waiting, now that we've decided to
1308 # ignore further output.
1319 my $sig = 2; # SIGINT first
1320 if (exists $proc{$_}->{"sent_$sig"} &&
1321 time - $proc{$_}->{"sent_$sig"} > 4)
1323 $sig = 15; # SIGTERM if SIGINT doesn't work
1325 if (exists $proc{$_}->{"sent_$sig"} &&
1326 time - $proc{$_}->{"sent_$sig"} > 4)
1328 $sig = 9; # SIGKILL if SIGTERM doesn't work
1330 if (!exists $proc{$_}->{"sent_$sig"})
1332 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1334 select (undef, undef, undef, 0.1);
1337 kill $sig, $_; # srun wants two SIGINT to really interrupt
1339 $proc{$_}->{"sent_$sig"} = time;
1340 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1350 vec($bits,fileno($_),1) = 1;
1356 sub Log # ($jobstep_id, $logmessage)
1358 if ($_[1] =~ /\n/) {
1359 for my $line (split (/\n/, $_[1])) {
1364 my $fh = select STDERR; $|=1; select $fh;
1365 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1366 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1369 if ($local_logfile || -t STDERR) {
1370 my @gmtime = gmtime;
1371 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1372 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1374 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1376 if ($local_logfile) {
1377 print $local_logfile $datetime . " " . $message;
1384 my ($package, $file, $line) = caller;
1385 my $message = "@_ at $file line $line\n";
1386 Log (undef, $message);
1387 freeze() if @jobstep_todo;
1388 collate_output() if @jobstep_todo;
1390 save_meta() if $local_logfile;
1397 if ($Job->{'state'} eq 'Cancelled') {
1398 $Job->update_attributes('finished_at' => scalar gmtime);
1400 $Job->update_attributes('state' => 'Failed');
1407 my $justcheckpoint = shift; # false if this will be the last meta saved
1408 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1410 $local_logfile->flush;
1411 my $retry_count = put_retry_count();
1412 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1413 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1414 my $loglocator = `$cmd`;
1415 die "system $cmd exited ".exit_status_s($?) if $?;
1418 $local_logfile = undef; # the temp file is automatically deleted
1419 Log (undef, "log manifest is $loglocator");
1420 $Job->{'log'} = $loglocator;
1421 $Job->update_attributes('log', $loglocator);
1425 sub freeze_if_want_freeze
1427 if ($main::please_freeze)
1429 release_allocation();
1432 # kill some srun procs before freeze+stop
1433 map { $proc{$_} = {} } @_;
1436 killem (keys %proc);
1437 select (undef, undef, undef, 0.1);
1439 while (($died = waitpid (-1, WNOHANG)) > 0)
1441 delete $proc{$died};
1456 Log (undef, "Freeze not implemented");
1463 croak ("Thaw not implemented");
1479 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1486 my $srunargs = shift;
1487 my $execargs = shift;
1488 my $opts = shift || {};
1490 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1491 print STDERR (join (" ",
1492 map { / / ? "'$_'" : $_ }
1495 if $ENV{CRUNCH_DEBUG};
1497 if (defined $stdin) {
1498 my $child = open STDIN, "-|";
1499 defined $child or die "no fork: $!";
1501 print $stdin or die $!;
1502 close STDOUT or die $!;
1507 return system (@$args) if $opts->{fork};
1510 warn "ENV size is ".length(join(" ",%ENV));
1511 die "exec failed: $!: @$args";
1515 sub ban_node_by_slot {
1516 # Don't start any new jobsteps on this node for 60 seconds
1518 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1519 $slot[$slotid]->{node}->{hold_count}++;
1520 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1525 my ($lockfile, $error_message) = @_;
1526 open L, ">", $lockfile or croak("$lockfile: $!");
1527 if (!flock L, LOCK_EX|LOCK_NB) {
1528 croak("Can't lock $lockfile: $error_message\n");
1532 sub find_docker_image {
1533 # Given a Keep locator, check to see if it contains a Docker image.
1534 # If so, return its stream name and Docker hash.
1535 # If not, return undef for both values.
1536 my $locator = shift;
1537 my ($streamname, $filename);
1538 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1539 foreach my $line (split(/\n/, $image->{manifest_text})) {
1540 my @tokens = split(/\s+/, $line);
1542 $streamname = shift(@tokens);
1543 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1544 if (defined($filename)) {
1545 return (undef, undef); # More than one file in the Collection.
1547 $filename = (split(/:/, $filedata, 3))[2];
1552 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1553 return ($streamname, $1);
1555 return (undef, undef);
1559 sub put_retry_count {
1560 # Calculate a --retries argument for arv-put that will have it try
1561 # approximately as long as this Job has been running.
1562 my $stoptime = shift || time;
1563 my $starttime = $jobstep[0]->{starttime};
1564 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1566 while ($timediff >= 2) {
1570 return ($retries > 3) ? $retries : 3;
1574 # Given a $?, return a human-readable exit code string like "0" or
1575 # "1" or "0 with signal 1" or "1 with signal 11".
1576 my $exitcode = shift;
1577 my $s = $exitcode >> 8;
1578 if ($exitcode & 0x7f) {
1579 $s .= " with signal " . ($exitcode & 0x7f);
1581 if ($exitcode & 0x80) {
1582 $s .= " with core dump";
1590 # checkout-and-build
1593 use File::Path qw( make_path );
1595 my $destdir = $ENV{"CRUNCH_SRC"};
1596 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1597 my $repo = $ENV{"CRUNCH_SRC_URL"};
1598 my $task_work = $ENV{"TASK_WORK"};
1600 for my $dir ($destdir, $task_work) {
1603 -e $dir or die "Failed to create temporary directory ($dir): $!";
1607 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1609 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1612 die "Cannot exec `@ARGV`: $!";
1618 unlink "$destdir.commit";
1619 open STDOUT, ">", "$destdir.log";
1620 open STDERR, ">&STDOUT";
1623 my @git_archive_data = <DATA>;
1624 if (@git_archive_data) {
1625 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1626 print TARX @git_archive_data;
1628 die "'tar -C $destdir -xf -' exited $?: $!";
1633 chomp ($pwd = `pwd`);
1634 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1637 for my $src_path ("$destdir/arvados/sdk/python") {
1639 shell_or_die ("virtualenv", $install_dir);
1640 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1644 if (-e "$destdir/crunch_scripts/install") {
1645 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1646 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1648 shell_or_die ("./tests/autotests.sh", $install_dir);
1649 } elsif (-e "./install.sh") {
1650 shell_or_die ("./install.sh", $install_dir);
1654 unlink "$destdir.commit.new";
1655 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1656 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1663 die "Cannot exec `@ARGV`: $!";
1670 if ($ENV{"DEBUG"}) {
1671 print STDERR "@_\n";
1674 or die "@_ failed: $! exit 0x".sprintf("%x",$?);