3775: Merge branch 'master' into 3775-fetch-git-repo
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path remove_tree );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143
144 my $User = $arv->{'users'}->{'current'}->execute;
145
146 my $Job;
147 my $job_id;
148 my $dbh;
149 my $sth;
150 if ($jobspec =~ /^[-a-z\d]+$/)
151 {
152   # $jobspec is an Arvados UUID, not a JSON job specification
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # Claim this job, and make sure nobody else does
156     eval {
157       # lock() sets is_locked_by_uuid and changes state to Running.
158       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
159     };
160     if ($@) {
161       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
162       exit EX_TEMPFAIL;
163     };
164   }
165 }
166 else
167 {
168   $Job = JSON::decode_json($jobspec);
169
170   if (!$resume_stash)
171   {
172     map { croak ("No $_ specified") unless $Job->{$_} }
173     qw(script script_version script_parameters);
174   }
175
176   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177   $Job->{'started_at'} = gmtime;
178
179   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
180 }
181 $job_id = $Job->{'uuid'};
182
183 my $keep_logfile = $job_id . '.log.txt';
184 log_writer_start($keep_logfile);
185
186 $Job->{'runtime_constraints'} ||= {};
187 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
188 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
189
190
191 Log (undef, "check slurm allocation");
192 my @slot;
193 my @node;
194 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
195 my @sinfo;
196 if (!$have_slurm)
197 {
198   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
199   push @sinfo, "$localcpus localhost";
200 }
201 if (exists $ENV{SLURM_NODELIST})
202 {
203   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
204 }
205 foreach (@sinfo)
206 {
207   my ($ncpus, $slurm_nodelist) = split;
208   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
209
210   my @nodelist;
211   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
212   {
213     my $nodelist = $1;
214     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
215     {
216       my $ranges = $1;
217       foreach (split (",", $ranges))
218       {
219         my ($a, $b);
220         if (/(\d+)-(\d+)/)
221         {
222           $a = $1;
223           $b = $2;
224         }
225         else
226         {
227           $a = $_;
228           $b = $_;
229         }
230         push @nodelist, map {
231           my $n = $nodelist;
232           $n =~ s/\[[-,\d]+\]/$_/;
233           $n;
234         } ($a..$b);
235       }
236     }
237     else
238     {
239       push @nodelist, $nodelist;
240     }
241   }
242   foreach my $nodename (@nodelist)
243   {
244     Log (undef, "node $nodename - $ncpus slots");
245     my $node = { name => $nodename,
246                  ncpus => $ncpus,
247                  losing_streak => 0,
248                  hold_until => 0 };
249     foreach my $cpu (1..$ncpus)
250     {
251       push @slot, { node => $node,
252                     cpu => $cpu };
253     }
254   }
255   push @node, @nodelist;
256 }
257
258
259
260 # Ensure that we get one jobstep running on each allocated node before
261 # we start overloading nodes with concurrent steps
262
263 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
264
265
266 $Job->update_attributes(
267   'tasks_summary' => { 'failed' => 0,
268                        'todo' => 1,
269                        'running' => 0,
270                        'done' => 0 });
271
272 Log (undef, "start");
273 $SIG{'INT'} = sub { $main::please_freeze = 1; };
274 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
275 $SIG{'TERM'} = \&croak;
276 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
277 $SIG{'ALRM'} = sub { $main::please_info = 1; };
278 $SIG{'CONT'} = sub { $main::please_continue = 1; };
279 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
280
281 $main::please_freeze = 0;
282 $main::please_info = 0;
283 $main::please_continue = 0;
284 $main::please_refresh = 0;
285 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
286
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
290
291
292 my @jobstep;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
297 my $squeue_checked;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
300 my $latest_refresh = scalar time;
301
302
303
304 if (defined $Job->{thawedfromkey})
305 {
306   thaw ($Job->{thawedfromkey});
307 }
308 else
309 {
310   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
311     'job_uuid' => $Job->{'uuid'},
312     'sequence' => 0,
313     'qsequence' => 0,
314     'parameters' => {},
315                                                           });
316   push @jobstep, { 'level' => 0,
317                    'failures' => 0,
318                    'arvados_task' => $first_task,
319                  };
320   push @jobstep_todo, 0;
321 }
322
323
324 if (!$have_slurm)
325 {
326   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
327 }
328
329
330 my $build_script;
331 do {
332   local $/ = undef;
333   $build_script = <DATA>;
334 };
335 my $nodelist = join(",", @node);
336
337 if (!defined $no_clear_tmp) {
338   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
339   Log (undef, "Clean work dirs");
340
341   my $cleanpid = fork();
342   if ($cleanpid == 0)
343   {
344     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
345           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
346     exit (1);
347   }
348   while (1)
349   {
350     last if $cleanpid == waitpid (-1, WNOHANG);
351     freeze_if_want_freeze ($cleanpid);
352     select (undef, undef, undef, 0.1);
353   }
354   Log (undef, "Cleanup command exited ".exit_status_s($?));
355 }
356
357
358 my $git_archive;
359 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
360   # If we're in user-land (i.e., not called from crunch-dispatch)
361   # script_version can be an absolute directory path, signifying we
362   # should work straight out of that directory instead of using a git
363   # commit.
364   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
365   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
366 }
367 else {
368   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
369
370   # Install requested code version
371   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
372
373   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
374
375   # If we're running under crunch-dispatch, it will have already
376   # pulled the appropriate source tree into its own repository, and
377   # given us that repo's path as $git_dir.
378   #
379   # If we're running a "local" job, we might have to fetch content
380   # from a remote repository.
381   #
382   # (Currently crunch-dispatch gives a local path with --git-dir, but
383   # we might as well accept URLs there too in case it changes its
384   # mind.)
385   my $repo = $git_dir || $Job->{'repository'};
386
387   # Repository can be remote or local. If remote, we'll need to fetch it
388   # to a local dir before doing `git log` et al.
389   my $repo_location;
390
391   if ($repo =~ m{://|^[^/]*:}) {
392     # $repo is a git url we can clone, like git:// or https:// or
393     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
394     # not recognized here because distinguishing that from a local
395     # path is too fragile. If you really need something strange here,
396     # use the ssh:// form.
397     $repo_location = 'remote';
398   } elsif ($repo =~ m{^\.*/}) {
399     # $repo is a local path to a git index. We'll also resolve ../foo
400     # to ../foo/.git if the latter is a directory. To help
401     # disambiguate local paths from named hosted repositories, this
402     # form must be given as ./ or ../ if it's a relative path.
403     if (-d "$repo/.git") {
404       $repo = "$repo/.git";
405     }
406     $repo_location = 'local';
407   } else {
408     # $repo is none of the above. It must be the name of a hosted
409     # repository.
410     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
411       'filters' => [['name','=',$repo]]
412         )->{'items'};
413     my $n_found = scalar @{$arv_repo_list};
414     if ($n_found > 0) {
415       Log(undef, "Repository '$repo' -> "
416           . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
417     }
418     if ($n_found != 1) {
419       croak("Error: Found $n_found repositories with name '$repo'.");
420     }
421     $repo = $arv_repo_list->[0]->{'fetch_url'};
422     $repo_location = 'remote';
423   }
424   Log(undef, "Using $repo_location repository '$repo'");
425   $ENV{"CRUNCH_SRC_URL"} = $repo;
426
427   # Resolve given script_version (we'll call that $treeish here) to a
428   # commit sha1 ($commit).
429   my $treeish = $Job->{'script_version'};
430   my $commit;
431   if ($repo_location eq 'remote') {
432     # We minimize excess object-fetching by re-using the same bare
433     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
434     # just keep adding remotes to it as needed.
435     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
436     my $gitcmd = "git --git-dir=\Q$local_repo\E";
437
438     # Set up our local repo for caching remote objects, making
439     # archives, etc.
440     if (!-d $local_repo) {
441       make_path($local_repo) or croak("Error: could not create $local_repo");
442     }
443     # This works (exits 0 and doesn't delete fetched objects) even
444     # if $local_repo is already initialized:
445     `$gitcmd init --bare`;
446     if ($?) {
447       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
448     }
449
450     # If $treeish looks like a hash (or abbrev hash) we look it up in
451     # our local cache first, since that's cheaper. (We don't want to
452     # do that with tags/branches though -- those change over time, so
453     # they should always be resolved by the remote repo.)
454     if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
455       # Hide stderr because it's normal for this to fail:
456       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
457       if ($? == 0 &&
458           $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
459           $sha1 =~ /^([0-9a-f]{40})$/s) {
460         $commit = $1;
461         Log(undef, "Commit $commit already present in $local_repo");
462       }
463     }
464
465     if (!defined $commit) {
466       # If $treeish isn't just a hash or abbrev hash, or isn't here
467       # yet, we need to fetch the remote to resolve it correctly.
468
469       # First, remove all local heads. This prevents a name that does
470       # not exist on the remote from resolving to (or colliding with)
471       # a previously fetched branch or tag (possibly from a different
472       # remote).
473       remove_tree("$local_repo/refs/heads", {keep_root => 1});
474
475       Log(undef, "Fetching objects from $repo to $local_repo");
476       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
477       if ($?) {
478         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
479       }
480     }
481
482     # Now that the data is all here, we will use our local repo for
483     # the rest of our git activities.
484     $repo = $local_repo;
485   }
486
487   my $gitcmd = "git --git-dir=\Q$repo\E";
488   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
489   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
490     croak("`$gitcmd rev-list` exited "
491           .exit_status_s($?)
492           .", '$treeish' not found. Giving up.");
493   }
494   $commit = $1;
495   Log(undef, "Version $treeish is commit $commit");
496
497   if ($commit ne $Job->{'script_version'}) {
498     # Record the real commit id in the database, frozentokey, logs,
499     # etc. -- instead of an abbreviation or a branch name which can
500     # become ambiguous or point to a different commit in the future.
501     if (!$Job->update_attributes('script_version' => $commit)) {
502       croak("Error: failed to update job's script_version attribute");
503     }
504   }
505
506   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
507   $git_archive = `$gitcmd archive ''\Q$commit\E`;
508   if ($?) {
509     croak("Error: $gitcmd archive exited ".exit_status_s($?));
510   }
511 }
512
513 if (!defined $git_archive) {
514   Log(undef, "Skip install phase (no git archive)");
515   if ($have_slurm) {
516     Log(undef, "Warning: This probably means workers have no source tree!");
517   }
518 }
519 else {
520   Log(undef, "Run install script on all workers");
521
522   my @srunargs = ("srun",
523                   "--nodelist=$nodelist",
524                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
525   my @execargs = ("sh", "-c",
526                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
527
528   # Note: this section is almost certainly unnecessary if we're
529   # running tasks in docker containers.
530   my $installpid = fork();
531   if ($installpid == 0)
532   {
533     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
534     exit (1);
535   }
536   while (1)
537   {
538     last if $installpid == waitpid (-1, WNOHANG);
539     freeze_if_want_freeze ($installpid);
540     select (undef, undef, undef, 0.1);
541   }
542   Log (undef, "Install script exited ".exit_status_s($?));
543 }
544
545 if (!$have_slurm)
546 {
547   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
548   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
549 }
550
551 # If this job requires a Docker image, install that.
552 my $docker_bin = "/usr/bin/docker.io";
553 my ($docker_locator, $docker_stream, $docker_hash);
554 if ($docker_locator = $Job->{docker_image_locator}) {
555   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
556   if (!$docker_hash)
557   {
558     croak("No Docker image hash found from locator $docker_locator");
559   }
560   $docker_stream =~ s/^\.//;
561   my $docker_install_script = qq{
562 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
563     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
564 fi
565 };
566   my $docker_pid = fork();
567   if ($docker_pid == 0)
568   {
569     srun (["srun", "--nodelist=" . join(',', @node)],
570           ["/bin/sh", "-ec", $docker_install_script]);
571     exit ($?);
572   }
573   while (1)
574   {
575     last if $docker_pid == waitpid (-1, WNOHANG);
576     freeze_if_want_freeze ($docker_pid);
577     select (undef, undef, undef, 0.1);
578   }
579   if ($? != 0)
580   {
581     croak("Installing Docker image from $docker_locator exited "
582           .exit_status_s($?));
583   }
584 }
585
586 foreach (qw (script script_version script_parameters runtime_constraints))
587 {
588   Log (undef,
589        "$_ " .
590        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
591 }
592 foreach (split (/\n/, $Job->{knobs}))
593 {
594   Log (undef, "knob " . $_);
595 }
596
597
598
599 $main::success = undef;
600
601
602
603 ONELEVEL:
604
605 my $thisround_succeeded = 0;
606 my $thisround_failed = 0;
607 my $thisround_failed_multiple = 0;
608
609 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
610                        or $a <=> $b } @jobstep_todo;
611 my $level = $jobstep[$jobstep_todo[0]]->{level};
612 Log (undef, "start level $level");
613
614
615
616 my %proc;
617 my @freeslot = (0..$#slot);
618 my @holdslot;
619 my %reader;
620 my $progress_is_dirty = 1;
621 my $progress_stats_updated = 0;
622
623 update_progress_stats();
624
625
626
627 THISROUND:
628 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
629 {
630   my $id = $jobstep_todo[$todo_ptr];
631   my $Jobstep = $jobstep[$id];
632   if ($Jobstep->{level} != $level)
633   {
634     next;
635   }
636
637   pipe $reader{$id}, "writer" or croak ($!);
638   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
639   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
640
641   my $childslot = $freeslot[0];
642   my $childnode = $slot[$childslot]->{node};
643   my $childslotname = join (".",
644                             $slot[$childslot]->{node}->{name},
645                             $slot[$childslot]->{cpu});
646   my $childpid = fork();
647   if ($childpid == 0)
648   {
649     $SIG{'INT'} = 'DEFAULT';
650     $SIG{'QUIT'} = 'DEFAULT';
651     $SIG{'TERM'} = 'DEFAULT';
652
653     foreach (values (%reader))
654     {
655       close($_);
656     }
657     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
658     open(STDOUT,">&writer");
659     open(STDERR,">&writer");
660
661     undef $dbh;
662     undef $sth;
663
664     delete $ENV{"GNUPGHOME"};
665     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
666     $ENV{"TASK_QSEQUENCE"} = $id;
667     $ENV{"TASK_SEQUENCE"} = $level;
668     $ENV{"JOB_SCRIPT"} = $Job->{script};
669     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
670       $param =~ tr/a-z/A-Z/;
671       $ENV{"JOB_PARAMETER_$param"} = $value;
672     }
673     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
674     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
675     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
676     $ENV{"HOME"} = $ENV{"TASK_WORK"};
677     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
678     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
679     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
680     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
681
682     $ENV{"GZIP"} = "-n";
683
684     my @srunargs = (
685       "srun",
686       "--nodelist=".$childnode->{name},
687       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
688       "--job-name=$job_id.$id.$$",
689         );
690     my $build_script_to_send = "";
691     my $command =
692         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
693         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
694         ."&& cd $ENV{CRUNCH_TMP} ";
695     if ($build_script)
696     {
697       $build_script_to_send = $build_script;
698       $command .=
699           "&& perl -";
700     }
701     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
702     if ($docker_hash)
703     {
704       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
705       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
706       # Dynamically configure the container to use the host system as its
707       # DNS server.  Get the host's global addresses from the ip command,
708       # and turn them into docker --dns options using gawk.
709       $command .=
710           q{$(ip -o address show scope global |
711               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
712       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
713       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
714       $command .= "--env=\QHOME=/home/crunch\E ";
715       while (my ($env_key, $env_val) = each %ENV)
716       {
717         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
718           if ($env_key eq "TASK_WORK") {
719             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
720           }
721           elsif ($env_key eq "TASK_KEEPMOUNT") {
722             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
723           }
724           else {
725             $command .= "--env=\Q$env_key=$env_val\E ";
726           }
727         }
728       }
729       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
730       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
731       $command .= "\Q$docker_hash\E ";
732       $command .= "stdbuf --output=0 --error=0 ";
733       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
734     } else {
735       # Non-docker run
736       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
737       $command .= "stdbuf --output=0 --error=0 ";
738       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
739     }
740
741     my @execargs = ('bash', '-c', $command);
742     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
743     # exec() failed, we assume nothing happened.
744     Log(undef, "srun() failed on build script");
745     die;
746   }
747   close("writer");
748   if (!defined $childpid)
749   {
750     close $reader{$id};
751     delete $reader{$id};
752     next;
753   }
754   shift @freeslot;
755   $proc{$childpid} = { jobstep => $id,
756                        time => time,
757                        slot => $childslot,
758                        jobstepname => "$job_id.$id.$childpid",
759                      };
760   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
761   $slot[$childslot]->{pid} = $childpid;
762
763   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
764   Log ($id, "child $childpid started on $childslotname");
765   $Jobstep->{starttime} = time;
766   $Jobstep->{node} = $childnode->{name};
767   $Jobstep->{slotindex} = $childslot;
768   delete $Jobstep->{stderr};
769   delete $Jobstep->{finishtime};
770
771   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
772   $Jobstep->{'arvados_task'}->save;
773
774   splice @jobstep_todo, $todo_ptr, 1;
775   --$todo_ptr;
776
777   $progress_is_dirty = 1;
778
779   while (!@freeslot
780          ||
781          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
782   {
783     last THISROUND if $main::please_freeze;
784     if ($main::please_info)
785     {
786       $main::please_info = 0;
787       freeze();
788       collate_output();
789       save_meta(1);
790       update_progress_stats();
791     }
792     my $gotsome
793         = readfrompipes ()
794         + reapchildren ();
795     if (!$gotsome)
796     {
797       check_refresh_wanted();
798       check_squeue();
799       update_progress_stats();
800       select (undef, undef, undef, 0.1);
801     }
802     elsif (time - $progress_stats_updated >= 30)
803     {
804       update_progress_stats();
805     }
806     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
807         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
808     {
809       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
810           .($thisround_failed+$thisround_succeeded)
811           .") -- giving up on this round";
812       Log (undef, $message);
813       last THISROUND;
814     }
815
816     # move slots from freeslot to holdslot (or back to freeslot) if necessary
817     for (my $i=$#freeslot; $i>=0; $i--) {
818       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
819         push @holdslot, (splice @freeslot, $i, 1);
820       }
821     }
822     for (my $i=$#holdslot; $i>=0; $i--) {
823       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
824         push @freeslot, (splice @holdslot, $i, 1);
825       }
826     }
827
828     # give up if no nodes are succeeding
829     if (!grep { $_->{node}->{losing_streak} == 0 &&
830                     $_->{node}->{hold_count} < 4 } @slot) {
831       my $message = "Every node has failed -- giving up on this round";
832       Log (undef, $message);
833       last THISROUND;
834     }
835   }
836 }
837
838
839 push @freeslot, splice @holdslot;
840 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
841
842
843 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
844 while (%proc)
845 {
846   if ($main::please_continue) {
847     $main::please_continue = 0;
848     goto THISROUND;
849   }
850   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
851   readfrompipes ();
852   if (!reapchildren())
853   {
854     check_refresh_wanted();
855     check_squeue();
856     update_progress_stats();
857     select (undef, undef, undef, 0.1);
858     killem (keys %proc) if $main::please_freeze;
859   }
860 }
861
862 update_progress_stats();
863 freeze_if_want_freeze();
864
865
866 if (!defined $main::success)
867 {
868   if (@jobstep_todo &&
869       $thisround_succeeded == 0 &&
870       ($thisround_failed == 0 || $thisround_failed > 4))
871   {
872     my $message = "stop because $thisround_failed tasks failed and none succeeded";
873     Log (undef, $message);
874     $main::success = 0;
875   }
876   if (!@jobstep_todo)
877   {
878     $main::success = 1;
879   }
880 }
881
882 goto ONELEVEL if !defined $main::success;
883
884
885 release_allocation();
886 freeze();
887 my $collated_output = &collate_output();
888
889 if (!$collated_output) {
890   Log(undef, "output undef");
891 }
892 else {
893   eval {
894     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
895         or die "failed to get collated manifest: $!";
896     my $orig_manifest_text = '';
897     while (my $manifest_line = <$orig_manifest>) {
898       $orig_manifest_text .= $manifest_line;
899     }
900     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
901       'manifest_text' => $orig_manifest_text,
902     });
903     Log(undef, "output uuid " . $output->{uuid});
904     Log(undef, "output hash " . $output->{portable_data_hash});
905     $Job->update_attributes('output' => $output->{portable_data_hash});
906   };
907   if ($@) {
908     Log (undef, "Failed to register output manifest: $@");
909   }
910 }
911
912 Log (undef, "finish");
913
914 save_meta();
915
916 my $final_state;
917 if ($collated_output && $main::success) {
918   $final_state = 'Complete';
919 } else {
920   $final_state = 'Failed';
921 }
922 $Job->update_attributes('state' => $final_state);
923
924 exit (($final_state eq 'Complete') ? 0 : 1);
925
926
927
928 sub update_progress_stats
929 {
930   $progress_stats_updated = time;
931   return if !$progress_is_dirty;
932   my ($todo, $done, $running) = (scalar @jobstep_todo,
933                                  scalar @jobstep_done,
934                                  scalar @slot - scalar @freeslot - scalar @holdslot);
935   $Job->{'tasks_summary'} ||= {};
936   $Job->{'tasks_summary'}->{'todo'} = $todo;
937   $Job->{'tasks_summary'}->{'done'} = $done;
938   $Job->{'tasks_summary'}->{'running'} = $running;
939   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
940   Log (undef, "status: $done done, $running running, $todo todo");
941   $progress_is_dirty = 0;
942 }
943
944
945
946 sub reapchildren
947 {
948   my $pid = waitpid (-1, WNOHANG);
949   return 0 if $pid <= 0;
950
951   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
952                   . "."
953                   . $slot[$proc{$pid}->{slot}]->{cpu});
954   my $jobstepid = $proc{$pid}->{jobstep};
955   my $elapsed = time - $proc{$pid}->{time};
956   my $Jobstep = $jobstep[$jobstepid];
957
958   my $childstatus = $?;
959   my $exitvalue = $childstatus >> 8;
960   my $exitinfo = "exit ".exit_status_s($childstatus);
961   $Jobstep->{'arvados_task'}->reload;
962   my $task_success = $Jobstep->{'arvados_task'}->{success};
963
964   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
965
966   if (!defined $task_success) {
967     # task did not indicate one way or the other --> fail
968     $Jobstep->{'arvados_task'}->{success} = 0;
969     $Jobstep->{'arvados_task'}->save;
970     $task_success = 0;
971   }
972
973   if (!$task_success)
974   {
975     my $temporary_fail;
976     $temporary_fail ||= $Jobstep->{node_fail};
977     $temporary_fail ||= ($exitvalue == 111);
978
979     ++$thisround_failed;
980     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
981
982     # Check for signs of a failed or misconfigured node
983     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
984         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
985       # Don't count this against jobstep failure thresholds if this
986       # node is already suspected faulty and srun exited quickly
987       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
988           $elapsed < 5) {
989         Log ($jobstepid, "blaming failure on suspect node " .
990              $slot[$proc{$pid}->{slot}]->{node}->{name});
991         $temporary_fail ||= 1;
992       }
993       ban_node_by_slot($proc{$pid}->{slot});
994     }
995
996     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
997                              ++$Jobstep->{'failures'},
998                              $temporary_fail ? 'temporary ' : 'permanent',
999                              $elapsed));
1000
1001     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1002       # Give up on this task, and the whole job
1003       $main::success = 0;
1004       $main::please_freeze = 1;
1005     }
1006     # Put this task back on the todo queue
1007     push @jobstep_todo, $jobstepid;
1008     $Job->{'tasks_summary'}->{'failed'}++;
1009   }
1010   else
1011   {
1012     ++$thisround_succeeded;
1013     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1014     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1015     push @jobstep_done, $jobstepid;
1016     Log ($jobstepid, "success in $elapsed seconds");
1017   }
1018   $Jobstep->{exitcode} = $childstatus;
1019   $Jobstep->{finishtime} = time;
1020   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1021   $Jobstep->{'arvados_task'}->save;
1022   process_stderr ($jobstepid, $task_success);
1023   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1024
1025   close $reader{$jobstepid};
1026   delete $reader{$jobstepid};
1027   delete $slot[$proc{$pid}->{slot}]->{pid};
1028   push @freeslot, $proc{$pid}->{slot};
1029   delete $proc{$pid};
1030
1031   if ($task_success) {
1032     # Load new tasks
1033     my $newtask_list = [];
1034     my $newtask_results;
1035     do {
1036       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1037         'where' => {
1038           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1039         },
1040         'order' => 'qsequence',
1041         'offset' => scalar(@$newtask_list),
1042       );
1043       push(@$newtask_list, @{$newtask_results->{items}});
1044     } while (@{$newtask_results->{items}});
1045     foreach my $arvados_task (@$newtask_list) {
1046       my $jobstep = {
1047         'level' => $arvados_task->{'sequence'},
1048         'failures' => 0,
1049         'arvados_task' => $arvados_task
1050       };
1051       push @jobstep, $jobstep;
1052       push @jobstep_todo, $#jobstep;
1053     }
1054   }
1055
1056   $progress_is_dirty = 1;
1057   1;
1058 }
1059
1060 sub check_refresh_wanted
1061 {
1062   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1063   if (@stat && $stat[9] > $latest_refresh) {
1064     $latest_refresh = scalar time;
1065     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1066     for my $attr ('cancelled_at',
1067                   'cancelled_by_user_uuid',
1068                   'cancelled_by_client_uuid',
1069                   'state') {
1070       $Job->{$attr} = $Job2->{$attr};
1071     }
1072     if ($Job->{'state'} ne "Running") {
1073       if ($Job->{'state'} eq "Cancelled") {
1074         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1075       } else {
1076         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1077       }
1078       $main::success = 0;
1079       $main::please_freeze = 1;
1080     }
1081   }
1082 }
1083
1084 sub check_squeue
1085 {
1086   # return if the kill list was checked <4 seconds ago
1087   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1088   {
1089     return;
1090   }
1091   $squeue_kill_checked = time;
1092
1093   # use killem() on procs whose killtime is reached
1094   for (keys %proc)
1095   {
1096     if (exists $proc{$_}->{killtime}
1097         && $proc{$_}->{killtime} <= time)
1098     {
1099       killem ($_);
1100     }
1101   }
1102
1103   # return if the squeue was checked <60 seconds ago
1104   if (defined $squeue_checked && $squeue_checked > time - 60)
1105   {
1106     return;
1107   }
1108   $squeue_checked = time;
1109
1110   if (!$have_slurm)
1111   {
1112     # here is an opportunity to check for mysterious problems with local procs
1113     return;
1114   }
1115
1116   # get a list of steps still running
1117   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1118   chop @squeue;
1119   if ($squeue[-1] ne "ok")
1120   {
1121     return;
1122   }
1123   pop @squeue;
1124
1125   # which of my jobsteps are running, according to squeue?
1126   my %ok;
1127   foreach (@squeue)
1128   {
1129     if (/^(\d+)\.(\d+) (\S+)/)
1130     {
1131       if ($1 eq $ENV{SLURM_JOBID})
1132       {
1133         $ok{$3} = 1;
1134       }
1135     }
1136   }
1137
1138   # which of my active child procs (>60s old) were not mentioned by squeue?
1139   foreach (keys %proc)
1140   {
1141     if ($proc{$_}->{time} < time - 60
1142         && !exists $ok{$proc{$_}->{jobstepname}}
1143         && !exists $proc{$_}->{killtime})
1144     {
1145       # kill this proc if it hasn't exited in 30 seconds
1146       $proc{$_}->{killtime} = time + 30;
1147     }
1148   }
1149 }
1150
1151
1152 sub release_allocation
1153 {
1154   if ($have_slurm)
1155   {
1156     Log (undef, "release job allocation");
1157     system "scancel $ENV{SLURM_JOBID}";
1158   }
1159 }
1160
1161
1162 sub readfrompipes
1163 {
1164   my $gotsome = 0;
1165   foreach my $job (keys %reader)
1166   {
1167     my $buf;
1168     while (0 < sysread ($reader{$job}, $buf, 8192))
1169     {
1170       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1171       $jobstep[$job]->{stderr} .= $buf;
1172       preprocess_stderr ($job);
1173       if (length ($jobstep[$job]->{stderr}) > 16384)
1174       {
1175         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1176       }
1177       $gotsome = 1;
1178     }
1179   }
1180   return $gotsome;
1181 }
1182
1183
1184 sub preprocess_stderr
1185 {
1186   my $job = shift;
1187
1188   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1189     my $line = $1;
1190     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1191     Log ($job, "stderr $line");
1192     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1193       # whoa.
1194       $main::please_freeze = 1;
1195     }
1196     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1197       $jobstep[$job]->{node_fail} = 1;
1198       ban_node_by_slot($jobstep[$job]->{slotindex});
1199     }
1200   }
1201 }
1202
1203
1204 sub process_stderr
1205 {
1206   my $job = shift;
1207   my $task_success = shift;
1208   preprocess_stderr ($job);
1209
1210   map {
1211     Log ($job, "stderr $_");
1212   } split ("\n", $jobstep[$job]->{stderr});
1213 }
1214
1215 sub fetch_block
1216 {
1217   my $hash = shift;
1218   my ($keep, $child_out, $output_block);
1219
1220   my $cmd = "arv-get \Q$hash\E";
1221   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1222   $output_block = '';
1223   while (1) {
1224     my $buf;
1225     my $bytes = sysread($keep, $buf, 1024 * 1024);
1226     if (!defined $bytes) {
1227       die "reading from arv-get: $!";
1228     } elsif ($bytes == 0) {
1229       # sysread returns 0 at the end of the pipe.
1230       last;
1231     } else {
1232       # some bytes were read into buf.
1233       $output_block .= $buf;
1234     }
1235   }
1236   close $keep;
1237   return $output_block;
1238 }
1239
1240 sub collate_output
1241 {
1242   Log (undef, "collate");
1243
1244   my ($child_out, $child_in);
1245   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1246                   '--retries', put_retry_count());
1247   my $joboutput;
1248   for (@jobstep)
1249   {
1250     next if (!exists $_->{'arvados_task'}->{'output'} ||
1251              !$_->{'arvados_task'}->{'success'});
1252     my $output = $_->{'arvados_task'}->{output};
1253     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1254     {
1255       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1256       print $child_in $output;
1257     }
1258     elsif (@jobstep == 1)
1259     {
1260       $joboutput = $output;
1261       last;
1262     }
1263     elsif (defined (my $outblock = fetch_block ($output)))
1264     {
1265       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1266       print $child_in $outblock;
1267     }
1268     else
1269     {
1270       Log (undef, "XXX fetch_block($output) failed XXX");
1271       $main::success = 0;
1272     }
1273   }
1274   $child_in->close;
1275
1276   if (!defined $joboutput) {
1277     my $s = IO::Select->new($child_out);
1278     if ($s->can_read(120)) {
1279       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1280       chomp($joboutput);
1281       # TODO: Ensure exit status == 0.
1282     } else {
1283       Log (undef, "timed out reading from 'arv-put'");
1284     }
1285   }
1286   # TODO: kill $pid instead of waiting, now that we've decided to
1287   # ignore further output.
1288   waitpid($pid, 0);
1289
1290   return $joboutput;
1291 }
1292
1293
1294 sub killem
1295 {
1296   foreach (@_)
1297   {
1298     my $sig = 2;                # SIGINT first
1299     if (exists $proc{$_}->{"sent_$sig"} &&
1300         time - $proc{$_}->{"sent_$sig"} > 4)
1301     {
1302       $sig = 15;                # SIGTERM if SIGINT doesn't work
1303     }
1304     if (exists $proc{$_}->{"sent_$sig"} &&
1305         time - $proc{$_}->{"sent_$sig"} > 4)
1306     {
1307       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1308     }
1309     if (!exists $proc{$_}->{"sent_$sig"})
1310     {
1311       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1312       kill $sig, $_;
1313       select (undef, undef, undef, 0.1);
1314       if ($sig == 2)
1315       {
1316         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1317       }
1318       $proc{$_}->{"sent_$sig"} = time;
1319       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1320     }
1321   }
1322 }
1323
1324
1325 sub fhbits
1326 {
1327   my($bits);
1328   for (@_) {
1329     vec($bits,fileno($_),1) = 1;
1330   }
1331   $bits;
1332 }
1333
1334
1335 # Send log output to Keep via arv-put.
1336 #
1337 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1338 # $log_pipe_pid is the pid of the arv-put subprocess.
1339 #
1340 # The only functions that should access these variables directly are:
1341 #
1342 # log_writer_start($logfilename)
1343 #     Starts an arv-put pipe, reading data on stdin and writing it to
1344 #     a $logfilename file in an output collection.
1345 #
1346 # log_writer_send($txt)
1347 #     Writes $txt to the output log collection.
1348 #
1349 # log_writer_finish()
1350 #     Closes the arv-put pipe and returns the output that it produces.
1351 #
1352 # log_writer_is_active()
1353 #     Returns a true value if there is currently a live arv-put
1354 #     process, false otherwise.
1355 #
1356 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1357
1358 sub log_writer_start($)
1359 {
1360   my $logfilename = shift;
1361   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1362                         'arv-put', '--portable-data-hash',
1363                         '--retries', '3',
1364                         '--filename', $logfilename,
1365                         '-');
1366 }
1367
1368 sub log_writer_send($)
1369 {
1370   my $txt = shift;
1371   print $log_pipe_in $txt;
1372 }
1373
1374 sub log_writer_finish()
1375 {
1376   return unless $log_pipe_pid;
1377
1378   close($log_pipe_in);
1379   my $arv_put_output;
1380
1381   my $s = IO::Select->new($log_pipe_out);
1382   if ($s->can_read(120)) {
1383     sysread($log_pipe_out, $arv_put_output, 1024);
1384     chomp($arv_put_output);
1385   } else {
1386     Log (undef, "timed out reading from 'arv-put'");
1387   }
1388
1389   waitpid($log_pipe_pid, 0);
1390   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1391   if ($?) {
1392     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1393   }
1394
1395   return $arv_put_output;
1396 }
1397
1398 sub log_writer_is_active() {
1399   return $log_pipe_pid;
1400 }
1401
1402 sub Log                         # ($jobstep_id, $logmessage)
1403 {
1404   if ($_[1] =~ /\n/) {
1405     for my $line (split (/\n/, $_[1])) {
1406       Log ($_[0], $line);
1407     }
1408     return;
1409   }
1410   my $fh = select STDERR; $|=1; select $fh;
1411   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1412   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1413   $message .= "\n";
1414   my $datetime;
1415   if (log_writer_is_active() || -t STDERR) {
1416     my @gmtime = gmtime;
1417     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1418                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1419   }
1420   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1421
1422   if (log_writer_is_active()) {
1423     log_writer_send($datetime . " " . $message);
1424   }
1425 }
1426
1427
1428 sub croak
1429 {
1430   my ($package, $file, $line) = caller;
1431   my $message = "@_ at $file line $line\n";
1432   Log (undef, $message);
1433   freeze() if @jobstep_todo;
1434   collate_output() if @jobstep_todo;
1435   cleanup() if $Job;
1436   save_meta() if log_writer_is_active();
1437   die;
1438 }
1439
1440
1441 sub cleanup
1442 {
1443   if ($Job->{'state'} eq 'Cancelled') {
1444     $Job->update_attributes('finished_at' => scalar gmtime);
1445   } else {
1446     $Job->update_attributes('state' => 'Failed');
1447   }
1448 }
1449
1450
1451 sub save_meta
1452 {
1453   my $justcheckpoint = shift; # false if this will be the last meta saved
1454   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1455
1456   my $loglocator = log_writer_finish();
1457   Log (undef, "log manifest is $loglocator");
1458   $Job->{'log'} = $loglocator;
1459   $Job->update_attributes('log', $loglocator);
1460 }
1461
1462
1463 sub freeze_if_want_freeze
1464 {
1465   if ($main::please_freeze)
1466   {
1467     release_allocation();
1468     if (@_)
1469     {
1470       # kill some srun procs before freeze+stop
1471       map { $proc{$_} = {} } @_;
1472       while (%proc)
1473       {
1474         killem (keys %proc);
1475         select (undef, undef, undef, 0.1);
1476         my $died;
1477         while (($died = waitpid (-1, WNOHANG)) > 0)
1478         {
1479           delete $proc{$died};
1480         }
1481       }
1482     }
1483     freeze();
1484     collate_output();
1485     cleanup();
1486     save_meta();
1487     exit 1;
1488   }
1489 }
1490
1491
1492 sub freeze
1493 {
1494   Log (undef, "Freeze not implemented");
1495   return;
1496 }
1497
1498
1499 sub thaw
1500 {
1501   croak ("Thaw not implemented");
1502 }
1503
1504
1505 sub freezequote
1506 {
1507   my $s = shift;
1508   $s =~ s/\\/\\\\/g;
1509   $s =~ s/\n/\\n/g;
1510   return $s;
1511 }
1512
1513
1514 sub freezeunquote
1515 {
1516   my $s = shift;
1517   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1518   return $s;
1519 }
1520
1521
1522 sub srun
1523 {
1524   my $srunargs = shift;
1525   my $execargs = shift;
1526   my $opts = shift || {};
1527   my $stdin = shift;
1528   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1529   print STDERR (join (" ",
1530                       map { / / ? "'$_'" : $_ }
1531                       (@$args)),
1532                 "\n")
1533       if $ENV{CRUNCH_DEBUG};
1534
1535   if (defined $stdin) {
1536     my $child = open STDIN, "-|";
1537     defined $child or die "no fork: $!";
1538     if ($child == 0) {
1539       print $stdin or die $!;
1540       close STDOUT or die $!;
1541       exit 0;
1542     }
1543   }
1544
1545   return system (@$args) if $opts->{fork};
1546
1547   exec @$args;
1548   warn "ENV size is ".length(join(" ",%ENV));
1549   die "exec failed: $!: @$args";
1550 }
1551
1552
1553 sub ban_node_by_slot {
1554   # Don't start any new jobsteps on this node for 60 seconds
1555   my $slotid = shift;
1556   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1557   $slot[$slotid]->{node}->{hold_count}++;
1558   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1559 }
1560
1561 sub must_lock_now
1562 {
1563   my ($lockfile, $error_message) = @_;
1564   open L, ">", $lockfile or croak("$lockfile: $!");
1565   if (!flock L, LOCK_EX|LOCK_NB) {
1566     croak("Can't lock $lockfile: $error_message\n");
1567   }
1568 }
1569
1570 sub find_docker_image {
1571   # Given a Keep locator, check to see if it contains a Docker image.
1572   # If so, return its stream name and Docker hash.
1573   # If not, return undef for both values.
1574   my $locator = shift;
1575   my ($streamname, $filename);
1576   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1577     foreach my $line (split(/\n/, $image->{manifest_text})) {
1578       my @tokens = split(/\s+/, $line);
1579       next if (!@tokens);
1580       $streamname = shift(@tokens);
1581       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1582         if (defined($filename)) {
1583           return (undef, undef);  # More than one file in the Collection.
1584         } else {
1585           $filename = (split(/:/, $filedata, 3))[2];
1586         }
1587       }
1588     }
1589   }
1590   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1591     return ($streamname, $1);
1592   } else {
1593     return (undef, undef);
1594   }
1595 }
1596
1597 sub put_retry_count {
1598   # Calculate a --retries argument for arv-put that will have it try
1599   # approximately as long as this Job has been running.
1600   my $stoptime = shift || time;
1601   my $starttime = $jobstep[0]->{starttime};
1602   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1603   my $retries = 0;
1604   while ($timediff >= 2) {
1605     $retries++;
1606     $timediff /= 2;
1607   }
1608   return ($retries > 3) ? $retries : 3;
1609 }
1610
1611 sub exit_status_s {
1612   # Given a $?, return a human-readable exit code string like "0" or
1613   # "1" or "0 with signal 1" or "1 with signal 11".
1614   my $exitcode = shift;
1615   my $s = $exitcode >> 8;
1616   if ($exitcode & 0x7f) {
1617     $s .= " with signal " . ($exitcode & 0x7f);
1618   }
1619   if ($exitcode & 0x80) {
1620     $s .= " with core dump";
1621   }
1622   return $s;
1623 }
1624
1625 __DATA__
1626 #!/usr/bin/perl
1627
1628 # checkout-and-build
1629
1630 use Fcntl ':flock';
1631 use File::Path qw( make_path );
1632
1633 my $destdir = $ENV{"CRUNCH_SRC"};
1634 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1635 my $repo = $ENV{"CRUNCH_SRC_URL"};
1636 my $task_work = $ENV{"TASK_WORK"};
1637
1638 for my $dir ($destdir, $task_work) {
1639     if ($dir) {
1640         make_path $dir;
1641         -e $dir or die "Failed to create temporary directory ($dir): $!";
1642     }
1643 }
1644
1645 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1646 flock L, LOCK_EX;
1647 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1648     if (@ARGV) {
1649         exec(@ARGV);
1650         die "Cannot exec `@ARGV`: $!";
1651     } else {
1652         exit 0;
1653     }
1654 }
1655
1656 unlink "$destdir.commit";
1657 open STDOUT, ">", "$destdir.log";
1658 open STDERR, ">&STDOUT";
1659
1660 mkdir $destdir;
1661 my @git_archive_data = <DATA>;
1662 if (@git_archive_data) {
1663   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1664   print TARX @git_archive_data;
1665   if(!close(TARX)) {
1666     die "'tar -C $destdir -xf -' exited $?: $!";
1667   }
1668 }
1669
1670 my $pwd;
1671 chomp ($pwd = `pwd`);
1672 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1673 mkdir $install_dir;
1674
1675 for my $src_path ("$destdir/arvados/sdk/python") {
1676   if (-d $src_path) {
1677     shell_or_die ("virtualenv", $install_dir);
1678     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1679   }
1680 }
1681
1682 if (-e "$destdir/crunch_scripts/install") {
1683     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1684 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1685     # Old version
1686     shell_or_die ("./tests/autotests.sh", $install_dir);
1687 } elsif (-e "./install.sh") {
1688     shell_or_die ("./install.sh", $install_dir);
1689 }
1690
1691 if ($commit) {
1692     unlink "$destdir.commit.new";
1693     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1694     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1695 }
1696
1697 close L;
1698
1699 if (@ARGV) {
1700     exec(@ARGV);
1701     die "Cannot exec `@ARGV`: $!";
1702 } else {
1703     exit 0;
1704 }
1705
1706 sub shell_or_die
1707 {
1708   if ($ENV{"DEBUG"}) {
1709     print STDERR "@_\n";
1710   }
1711   system (@_) == 0
1712       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1713 }
1714
1715 __DATA__