3775: Set state=Running when creating a Job in local mode.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path remove_tree );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143
144 my $User = $arv->{'users'}->{'current'}->execute;
145
146 my $Job;
147 my $job_id;
148 my $dbh;
149 my $sth;
150 if ($jobspec =~ /^[-a-z\d]+$/)
151 {
152   # $jobspec is an Arvados UUID, not a JSON job specification
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # Claim this job, and make sure nobody else does
156     eval {
157       # lock() sets is_locked_by_uuid and changes state to Running.
158       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
159     };
160     if ($@) {
161       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
162       exit EX_TEMPFAIL;
163     };
164   }
165 }
166 else
167 {
168   $Job = JSON::decode_json($jobspec);
169
170   if (!$resume_stash)
171   {
172     map { croak ("No $_ specified") unless $Job->{$_} }
173     qw(script script_version script_parameters);
174   }
175
176   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177   $Job->{'started_at'} = gmtime;
178   $Job->{'state'} = 'Running';
179
180   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181 }
182 $job_id = $Job->{'uuid'};
183
184 my $keep_logfile = $job_id . '.log.txt';
185 log_writer_start($keep_logfile);
186
187 $Job->{'runtime_constraints'} ||= {};
188 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
189 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
190
191
192 Log (undef, "check slurm allocation");
193 my @slot;
194 my @node;
195 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
196 my @sinfo;
197 if (!$have_slurm)
198 {
199   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
200   push @sinfo, "$localcpus localhost";
201 }
202 if (exists $ENV{SLURM_NODELIST})
203 {
204   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
205 }
206 foreach (@sinfo)
207 {
208   my ($ncpus, $slurm_nodelist) = split;
209   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
210
211   my @nodelist;
212   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
213   {
214     my $nodelist = $1;
215     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
216     {
217       my $ranges = $1;
218       foreach (split (",", $ranges))
219       {
220         my ($a, $b);
221         if (/(\d+)-(\d+)/)
222         {
223           $a = $1;
224           $b = $2;
225         }
226         else
227         {
228           $a = $_;
229           $b = $_;
230         }
231         push @nodelist, map {
232           my $n = $nodelist;
233           $n =~ s/\[[-,\d]+\]/$_/;
234           $n;
235         } ($a..$b);
236       }
237     }
238     else
239     {
240       push @nodelist, $nodelist;
241     }
242   }
243   foreach my $nodename (@nodelist)
244   {
245     Log (undef, "node $nodename - $ncpus slots");
246     my $node = { name => $nodename,
247                  ncpus => $ncpus,
248                  losing_streak => 0,
249                  hold_until => 0 };
250     foreach my $cpu (1..$ncpus)
251     {
252       push @slot, { node => $node,
253                     cpu => $cpu };
254     }
255   }
256   push @node, @nodelist;
257 }
258
259
260
261 # Ensure that we get one jobstep running on each allocated node before
262 # we start overloading nodes with concurrent steps
263
264 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
265
266
267 $Job->update_attributes(
268   'tasks_summary' => { 'failed' => 0,
269                        'todo' => 1,
270                        'running' => 0,
271                        'done' => 0 });
272
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
281
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
287
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
291
292
293 my @jobstep;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
298 my $squeue_checked;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
302
303
304
305 if (defined $Job->{thawedfromkey})
306 {
307   thaw ($Job->{thawedfromkey});
308 }
309 else
310 {
311   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312     'job_uuid' => $Job->{'uuid'},
313     'sequence' => 0,
314     'qsequence' => 0,
315     'parameters' => {},
316                                                           });
317   push @jobstep, { 'level' => 0,
318                    'failures' => 0,
319                    'arvados_task' => $first_task,
320                  };
321   push @jobstep_todo, 0;
322 }
323
324
325 if (!$have_slurm)
326 {
327   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
328 }
329
330
331 my $build_script;
332 do {
333   local $/ = undef;
334   $build_script = <DATA>;
335 };
336 my $nodelist = join(",", @node);
337
338 if (!defined $no_clear_tmp) {
339   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
340   Log (undef, "Clean work dirs");
341
342   my $cleanpid = fork();
343   if ($cleanpid == 0)
344   {
345     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
346           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
347     exit (1);
348   }
349   while (1)
350   {
351     last if $cleanpid == waitpid (-1, WNOHANG);
352     freeze_if_want_freeze ($cleanpid);
353     select (undef, undef, undef, 0.1);
354   }
355   Log (undef, "Cleanup command exited ".exit_status_s($?));
356 }
357
358
359 my $git_archive;
360 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
361   # If we're in user-land (i.e., not called from crunch-dispatch)
362   # script_version can be an absolute directory path, signifying we
363   # should work straight out of that directory instead of using a git
364   # commit.
365   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
366   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
367 }
368 else {
369   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370
371   # Install requested code version
372   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
373
374   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
375
376   # If we're running under crunch-dispatch, it will have already
377   # pulled the appropriate source tree into its own repository, and
378   # given us that repo's path as $git_dir.
379   #
380   # If we're running a "local" job, we might have to fetch content
381   # from a remote repository.
382   #
383   # (Currently crunch-dispatch gives a local path with --git-dir, but
384   # we might as well accept URLs there too in case it changes its
385   # mind.)
386   my $repo = $git_dir || $Job->{'repository'};
387
388   # Repository can be remote or local. If remote, we'll need to fetch it
389   # to a local dir before doing `git log` et al.
390   my $repo_location;
391
392   if ($repo =~ m{://|^[^/]*:}) {
393     # $repo is a git url we can clone, like git:// or https:// or
394     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
395     # not recognized here because distinguishing that from a local
396     # path is too fragile. If you really need something strange here,
397     # use the ssh:// form.
398     $repo_location = 'remote';
399   } elsif ($repo =~ m{^\.*/}) {
400     # $repo is a local path to a git index. We'll also resolve ../foo
401     # to ../foo/.git if the latter is a directory. To help
402     # disambiguate local paths from named hosted repositories, this
403     # form must be given as ./ or ../ if it's a relative path.
404     if (-d "$repo/.git") {
405       $repo = "$repo/.git";
406     }
407     $repo_location = 'local';
408   } else {
409     # $repo is none of the above. It must be the name of a hosted
410     # repository.
411     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
412       'filters' => [['name','=',$repo]]
413         )->{'items'};
414     my $n_found = scalar @{$arv_repo_list};
415     if ($n_found > 0) {
416       Log(undef, "Repository '$repo' -> "
417           . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
418     }
419     if ($n_found != 1) {
420       croak("Error: Found $n_found repositories with name '$repo'.");
421     }
422     $repo = $arv_repo_list->[0]->{'fetch_url'};
423     $repo_location = 'remote';
424   }
425   Log(undef, "Using $repo_location repository '$repo'");
426   $ENV{"CRUNCH_SRC_URL"} = $repo;
427
428   # Resolve given script_version (we'll call that $treeish here) to a
429   # commit sha1 ($commit).
430   my $treeish = $Job->{'script_version'};
431   my $commit;
432   if ($repo_location eq 'remote') {
433     # We minimize excess object-fetching by re-using the same bare
434     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
435     # just keep adding remotes to it as needed.
436     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
437     my $gitcmd = "git --git-dir=\Q$local_repo\E";
438
439     # Set up our local repo for caching remote objects, making
440     # archives, etc.
441     if (!-d $local_repo) {
442       make_path($local_repo) or croak("Error: could not create $local_repo");
443     }
444     # This works (exits 0 and doesn't delete fetched objects) even
445     # if $local_repo is already initialized:
446     `$gitcmd init --bare`;
447     if ($?) {
448       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
449     }
450
451     # If $treeish looks like a hash (or abbrev hash) we look it up in
452     # our local cache first, since that's cheaper. (We don't want to
453     # do that with tags/branches though -- those change over time, so
454     # they should always be resolved by the remote repo.)
455     if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
456       # Hide stderr because it's normal for this to fail:
457       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
458       if ($? == 0 &&
459           $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
460           $sha1 =~ /^([0-9a-f]{40})$/s) {
461         $commit = $1;
462         Log(undef, "Commit $commit already present in $local_repo");
463       }
464     }
465
466     if (!defined $commit) {
467       # If $treeish isn't just a hash or abbrev hash, or isn't here
468       # yet, we need to fetch the remote to resolve it correctly.
469
470       # First, remove all local heads. This prevents a name that does
471       # not exist on the remote from resolving to (or colliding with)
472       # a previously fetched branch or tag (possibly from a different
473       # remote).
474       remove_tree("$local_repo/refs/heads", {keep_root => 1});
475
476       Log(undef, "Fetching objects from $repo to $local_repo");
477       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
478       if ($?) {
479         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
480       }
481     }
482
483     # Now that the data is all here, we will use our local repo for
484     # the rest of our git activities.
485     $repo = $local_repo;
486   }
487
488   my $gitcmd = "git --git-dir=\Q$repo\E";
489   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
490   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
491     croak("`$gitcmd rev-list` exited "
492           .exit_status_s($?)
493           .", '$treeish' not found. Giving up.");
494   }
495   $commit = $1;
496   Log(undef, "Version $treeish is commit $commit");
497
498   if ($commit ne $Job->{'script_version'}) {
499     # Record the real commit id in the database, frozentokey, logs,
500     # etc. -- instead of an abbreviation or a branch name which can
501     # become ambiguous or point to a different commit in the future.
502     if (!$Job->update_attributes('script_version' => $commit)) {
503       croak("Error: failed to update job's script_version attribute");
504     }
505   }
506
507   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
508   $git_archive = `$gitcmd archive ''\Q$commit\E`;
509   if ($?) {
510     croak("Error: $gitcmd archive exited ".exit_status_s($?));
511   }
512 }
513
514 if (!defined $git_archive) {
515   Log(undef, "Skip install phase (no git archive)");
516   if ($have_slurm) {
517     Log(undef, "Warning: This probably means workers have no source tree!");
518   }
519 }
520 else {
521   Log(undef, "Run install script on all workers");
522
523   my @srunargs = ("srun",
524                   "--nodelist=$nodelist",
525                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
526   my @execargs = ("sh", "-c",
527                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
528
529   # Note: this section is almost certainly unnecessary if we're
530   # running tasks in docker containers.
531   my $installpid = fork();
532   if ($installpid == 0)
533   {
534     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
535     exit (1);
536   }
537   while (1)
538   {
539     last if $installpid == waitpid (-1, WNOHANG);
540     freeze_if_want_freeze ($installpid);
541     select (undef, undef, undef, 0.1);
542   }
543   Log (undef, "Install script exited ".exit_status_s($?));
544 }
545
546 if (!$have_slurm)
547 {
548   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
549   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
550 }
551
552 # If this job requires a Docker image, install that.
553 my $docker_bin = "/usr/bin/docker.io";
554 my ($docker_locator, $docker_stream, $docker_hash);
555 if ($docker_locator = $Job->{docker_image_locator}) {
556   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
557   if (!$docker_hash)
558   {
559     croak("No Docker image hash found from locator $docker_locator");
560   }
561   $docker_stream =~ s/^\.//;
562   my $docker_install_script = qq{
563 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
564     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
565 fi
566 };
567   my $docker_pid = fork();
568   if ($docker_pid == 0)
569   {
570     srun (["srun", "--nodelist=" . join(',', @node)],
571           ["/bin/sh", "-ec", $docker_install_script]);
572     exit ($?);
573   }
574   while (1)
575   {
576     last if $docker_pid == waitpid (-1, WNOHANG);
577     freeze_if_want_freeze ($docker_pid);
578     select (undef, undef, undef, 0.1);
579   }
580   if ($? != 0)
581   {
582     croak("Installing Docker image from $docker_locator exited "
583           .exit_status_s($?));
584   }
585 }
586
587 foreach (qw (script script_version script_parameters runtime_constraints))
588 {
589   Log (undef,
590        "$_ " .
591        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
592 }
593 foreach (split (/\n/, $Job->{knobs}))
594 {
595   Log (undef, "knob " . $_);
596 }
597
598
599
600 $main::success = undef;
601
602
603
604 ONELEVEL:
605
606 my $thisround_succeeded = 0;
607 my $thisround_failed = 0;
608 my $thisround_failed_multiple = 0;
609
610 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
611                        or $a <=> $b } @jobstep_todo;
612 my $level = $jobstep[$jobstep_todo[0]]->{level};
613 Log (undef, "start level $level");
614
615
616
617 my %proc;
618 my @freeslot = (0..$#slot);
619 my @holdslot;
620 my %reader;
621 my $progress_is_dirty = 1;
622 my $progress_stats_updated = 0;
623
624 update_progress_stats();
625
626
627
628 THISROUND:
629 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
630 {
631   my $id = $jobstep_todo[$todo_ptr];
632   my $Jobstep = $jobstep[$id];
633   if ($Jobstep->{level} != $level)
634   {
635     next;
636   }
637
638   pipe $reader{$id}, "writer" or croak ($!);
639   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
640   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
641
642   my $childslot = $freeslot[0];
643   my $childnode = $slot[$childslot]->{node};
644   my $childslotname = join (".",
645                             $slot[$childslot]->{node}->{name},
646                             $slot[$childslot]->{cpu});
647   my $childpid = fork();
648   if ($childpid == 0)
649   {
650     $SIG{'INT'} = 'DEFAULT';
651     $SIG{'QUIT'} = 'DEFAULT';
652     $SIG{'TERM'} = 'DEFAULT';
653
654     foreach (values (%reader))
655     {
656       close($_);
657     }
658     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
659     open(STDOUT,">&writer");
660     open(STDERR,">&writer");
661
662     undef $dbh;
663     undef $sth;
664
665     delete $ENV{"GNUPGHOME"};
666     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
667     $ENV{"TASK_QSEQUENCE"} = $id;
668     $ENV{"TASK_SEQUENCE"} = $level;
669     $ENV{"JOB_SCRIPT"} = $Job->{script};
670     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
671       $param =~ tr/a-z/A-Z/;
672       $ENV{"JOB_PARAMETER_$param"} = $value;
673     }
674     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
675     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
676     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
677     $ENV{"HOME"} = $ENV{"TASK_WORK"};
678     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
679     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
680     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
681     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
682
683     $ENV{"GZIP"} = "-n";
684
685     my @srunargs = (
686       "srun",
687       "--nodelist=".$childnode->{name},
688       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
689       "--job-name=$job_id.$id.$$",
690         );
691     my $build_script_to_send = "";
692     my $command =
693         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
694         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
695         ."&& cd $ENV{CRUNCH_TMP} ";
696     if ($build_script)
697     {
698       $build_script_to_send = $build_script;
699       $command .=
700           "&& perl -";
701     }
702     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
703     if ($docker_hash)
704     {
705       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
706       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
707       # Dynamically configure the container to use the host system as its
708       # DNS server.  Get the host's global addresses from the ip command,
709       # and turn them into docker --dns options using gawk.
710       $command .=
711           q{$(ip -o address show scope global |
712               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
713       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
714       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
715       $command .= "--env=\QHOME=/home/crunch\E ";
716       while (my ($env_key, $env_val) = each %ENV)
717       {
718         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
719           if ($env_key eq "TASK_WORK") {
720             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
721           }
722           elsif ($env_key eq "TASK_KEEPMOUNT") {
723             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
724           }
725           else {
726             $command .= "--env=\Q$env_key=$env_val\E ";
727           }
728         }
729       }
730       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
731       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
732       $command .= "\Q$docker_hash\E ";
733       $command .= "stdbuf --output=0 --error=0 ";
734       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
735     } else {
736       # Non-docker run
737       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
738       $command .= "stdbuf --output=0 --error=0 ";
739       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
740     }
741
742     my @execargs = ('bash', '-c', $command);
743     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
744     # exec() failed, we assume nothing happened.
745     Log(undef, "srun() failed on build script");
746     die;
747   }
748   close("writer");
749   if (!defined $childpid)
750   {
751     close $reader{$id};
752     delete $reader{$id};
753     next;
754   }
755   shift @freeslot;
756   $proc{$childpid} = { jobstep => $id,
757                        time => time,
758                        slot => $childslot,
759                        jobstepname => "$job_id.$id.$childpid",
760                      };
761   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
762   $slot[$childslot]->{pid} = $childpid;
763
764   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
765   Log ($id, "child $childpid started on $childslotname");
766   $Jobstep->{starttime} = time;
767   $Jobstep->{node} = $childnode->{name};
768   $Jobstep->{slotindex} = $childslot;
769   delete $Jobstep->{stderr};
770   delete $Jobstep->{finishtime};
771
772   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
773   $Jobstep->{'arvados_task'}->save;
774
775   splice @jobstep_todo, $todo_ptr, 1;
776   --$todo_ptr;
777
778   $progress_is_dirty = 1;
779
780   while (!@freeslot
781          ||
782          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
783   {
784     last THISROUND if $main::please_freeze;
785     if ($main::please_info)
786     {
787       $main::please_info = 0;
788       freeze();
789       collate_output();
790       save_meta(1);
791       update_progress_stats();
792     }
793     my $gotsome
794         = readfrompipes ()
795         + reapchildren ();
796     if (!$gotsome)
797     {
798       check_refresh_wanted();
799       check_squeue();
800       update_progress_stats();
801       select (undef, undef, undef, 0.1);
802     }
803     elsif (time - $progress_stats_updated >= 30)
804     {
805       update_progress_stats();
806     }
807     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
808         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
809     {
810       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
811           .($thisround_failed+$thisround_succeeded)
812           .") -- giving up on this round";
813       Log (undef, $message);
814       last THISROUND;
815     }
816
817     # move slots from freeslot to holdslot (or back to freeslot) if necessary
818     for (my $i=$#freeslot; $i>=0; $i--) {
819       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
820         push @holdslot, (splice @freeslot, $i, 1);
821       }
822     }
823     for (my $i=$#holdslot; $i>=0; $i--) {
824       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
825         push @freeslot, (splice @holdslot, $i, 1);
826       }
827     }
828
829     # give up if no nodes are succeeding
830     if (!grep { $_->{node}->{losing_streak} == 0 &&
831                     $_->{node}->{hold_count} < 4 } @slot) {
832       my $message = "Every node has failed -- giving up on this round";
833       Log (undef, $message);
834       last THISROUND;
835     }
836   }
837 }
838
839
840 push @freeslot, splice @holdslot;
841 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
842
843
844 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
845 while (%proc)
846 {
847   if ($main::please_continue) {
848     $main::please_continue = 0;
849     goto THISROUND;
850   }
851   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
852   readfrompipes ();
853   if (!reapchildren())
854   {
855     check_refresh_wanted();
856     check_squeue();
857     update_progress_stats();
858     select (undef, undef, undef, 0.1);
859     killem (keys %proc) if $main::please_freeze;
860   }
861 }
862
863 update_progress_stats();
864 freeze_if_want_freeze();
865
866
867 if (!defined $main::success)
868 {
869   if (@jobstep_todo &&
870       $thisround_succeeded == 0 &&
871       ($thisround_failed == 0 || $thisround_failed > 4))
872   {
873     my $message = "stop because $thisround_failed tasks failed and none succeeded";
874     Log (undef, $message);
875     $main::success = 0;
876   }
877   if (!@jobstep_todo)
878   {
879     $main::success = 1;
880   }
881 }
882
883 goto ONELEVEL if !defined $main::success;
884
885
886 release_allocation();
887 freeze();
888 my $collated_output = &collate_output();
889
890 if (!$collated_output) {
891   Log(undef, "output undef");
892 }
893 else {
894   eval {
895     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
896         or die "failed to get collated manifest: $!";
897     my $orig_manifest_text = '';
898     while (my $manifest_line = <$orig_manifest>) {
899       $orig_manifest_text .= $manifest_line;
900     }
901     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
902       'manifest_text' => $orig_manifest_text,
903     });
904     Log(undef, "output uuid " . $output->{uuid});
905     Log(undef, "output hash " . $output->{portable_data_hash});
906     $Job->update_attributes('output' => $output->{portable_data_hash});
907   };
908   if ($@) {
909     Log (undef, "Failed to register output manifest: $@");
910   }
911 }
912
913 Log (undef, "finish");
914
915 save_meta();
916
917 my $final_state;
918 if ($collated_output && $main::success) {
919   $final_state = 'Complete';
920 } else {
921   $final_state = 'Failed';
922 }
923 $Job->update_attributes('state' => $final_state);
924
925 exit (($final_state eq 'Complete') ? 0 : 1);
926
927
928
929 sub update_progress_stats
930 {
931   $progress_stats_updated = time;
932   return if !$progress_is_dirty;
933   my ($todo, $done, $running) = (scalar @jobstep_todo,
934                                  scalar @jobstep_done,
935                                  scalar @slot - scalar @freeslot - scalar @holdslot);
936   $Job->{'tasks_summary'} ||= {};
937   $Job->{'tasks_summary'}->{'todo'} = $todo;
938   $Job->{'tasks_summary'}->{'done'} = $done;
939   $Job->{'tasks_summary'}->{'running'} = $running;
940   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
941   Log (undef, "status: $done done, $running running, $todo todo");
942   $progress_is_dirty = 0;
943 }
944
945
946
947 sub reapchildren
948 {
949   my $pid = waitpid (-1, WNOHANG);
950   return 0 if $pid <= 0;
951
952   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
953                   . "."
954                   . $slot[$proc{$pid}->{slot}]->{cpu});
955   my $jobstepid = $proc{$pid}->{jobstep};
956   my $elapsed = time - $proc{$pid}->{time};
957   my $Jobstep = $jobstep[$jobstepid];
958
959   my $childstatus = $?;
960   my $exitvalue = $childstatus >> 8;
961   my $exitinfo = "exit ".exit_status_s($childstatus);
962   $Jobstep->{'arvados_task'}->reload;
963   my $task_success = $Jobstep->{'arvados_task'}->{success};
964
965   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
966
967   if (!defined $task_success) {
968     # task did not indicate one way or the other --> fail
969     $Jobstep->{'arvados_task'}->{success} = 0;
970     $Jobstep->{'arvados_task'}->save;
971     $task_success = 0;
972   }
973
974   if (!$task_success)
975   {
976     my $temporary_fail;
977     $temporary_fail ||= $Jobstep->{node_fail};
978     $temporary_fail ||= ($exitvalue == 111);
979
980     ++$thisround_failed;
981     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
982
983     # Check for signs of a failed or misconfigured node
984     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
985         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
986       # Don't count this against jobstep failure thresholds if this
987       # node is already suspected faulty and srun exited quickly
988       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
989           $elapsed < 5) {
990         Log ($jobstepid, "blaming failure on suspect node " .
991              $slot[$proc{$pid}->{slot}]->{node}->{name});
992         $temporary_fail ||= 1;
993       }
994       ban_node_by_slot($proc{$pid}->{slot});
995     }
996
997     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
998                              ++$Jobstep->{'failures'},
999                              $temporary_fail ? 'temporary ' : 'permanent',
1000                              $elapsed));
1001
1002     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1003       # Give up on this task, and the whole job
1004       $main::success = 0;
1005       $main::please_freeze = 1;
1006     }
1007     # Put this task back on the todo queue
1008     push @jobstep_todo, $jobstepid;
1009     $Job->{'tasks_summary'}->{'failed'}++;
1010   }
1011   else
1012   {
1013     ++$thisround_succeeded;
1014     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1015     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1016     push @jobstep_done, $jobstepid;
1017     Log ($jobstepid, "success in $elapsed seconds");
1018   }
1019   $Jobstep->{exitcode} = $childstatus;
1020   $Jobstep->{finishtime} = time;
1021   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1022   $Jobstep->{'arvados_task'}->save;
1023   process_stderr ($jobstepid, $task_success);
1024   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1025
1026   close $reader{$jobstepid};
1027   delete $reader{$jobstepid};
1028   delete $slot[$proc{$pid}->{slot}]->{pid};
1029   push @freeslot, $proc{$pid}->{slot};
1030   delete $proc{$pid};
1031
1032   if ($task_success) {
1033     # Load new tasks
1034     my $newtask_list = [];
1035     my $newtask_results;
1036     do {
1037       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1038         'where' => {
1039           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1040         },
1041         'order' => 'qsequence',
1042         'offset' => scalar(@$newtask_list),
1043       );
1044       push(@$newtask_list, @{$newtask_results->{items}});
1045     } while (@{$newtask_results->{items}});
1046     foreach my $arvados_task (@$newtask_list) {
1047       my $jobstep = {
1048         'level' => $arvados_task->{'sequence'},
1049         'failures' => 0,
1050         'arvados_task' => $arvados_task
1051       };
1052       push @jobstep, $jobstep;
1053       push @jobstep_todo, $#jobstep;
1054     }
1055   }
1056
1057   $progress_is_dirty = 1;
1058   1;
1059 }
1060
1061 sub check_refresh_wanted
1062 {
1063   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1064   if (@stat && $stat[9] > $latest_refresh) {
1065     $latest_refresh = scalar time;
1066     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1067     for my $attr ('cancelled_at',
1068                   'cancelled_by_user_uuid',
1069                   'cancelled_by_client_uuid',
1070                   'state') {
1071       $Job->{$attr} = $Job2->{$attr};
1072     }
1073     if ($Job->{'state'} ne "Running") {
1074       if ($Job->{'state'} eq "Cancelled") {
1075         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1076       } else {
1077         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1078       }
1079       $main::success = 0;
1080       $main::please_freeze = 1;
1081     }
1082   }
1083 }
1084
1085 sub check_squeue
1086 {
1087   # return if the kill list was checked <4 seconds ago
1088   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1089   {
1090     return;
1091   }
1092   $squeue_kill_checked = time;
1093
1094   # use killem() on procs whose killtime is reached
1095   for (keys %proc)
1096   {
1097     if (exists $proc{$_}->{killtime}
1098         && $proc{$_}->{killtime} <= time)
1099     {
1100       killem ($_);
1101     }
1102   }
1103
1104   # return if the squeue was checked <60 seconds ago
1105   if (defined $squeue_checked && $squeue_checked > time - 60)
1106   {
1107     return;
1108   }
1109   $squeue_checked = time;
1110
1111   if (!$have_slurm)
1112   {
1113     # here is an opportunity to check for mysterious problems with local procs
1114     return;
1115   }
1116
1117   # get a list of steps still running
1118   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1119   chop @squeue;
1120   if ($squeue[-1] ne "ok")
1121   {
1122     return;
1123   }
1124   pop @squeue;
1125
1126   # which of my jobsteps are running, according to squeue?
1127   my %ok;
1128   foreach (@squeue)
1129   {
1130     if (/^(\d+)\.(\d+) (\S+)/)
1131     {
1132       if ($1 eq $ENV{SLURM_JOBID})
1133       {
1134         $ok{$3} = 1;
1135       }
1136     }
1137   }
1138
1139   # which of my active child procs (>60s old) were not mentioned by squeue?
1140   foreach (keys %proc)
1141   {
1142     if ($proc{$_}->{time} < time - 60
1143         && !exists $ok{$proc{$_}->{jobstepname}}
1144         && !exists $proc{$_}->{killtime})
1145     {
1146       # kill this proc if it hasn't exited in 30 seconds
1147       $proc{$_}->{killtime} = time + 30;
1148     }
1149   }
1150 }
1151
1152
1153 sub release_allocation
1154 {
1155   if ($have_slurm)
1156   {
1157     Log (undef, "release job allocation");
1158     system "scancel $ENV{SLURM_JOBID}";
1159   }
1160 }
1161
1162
1163 sub readfrompipes
1164 {
1165   my $gotsome = 0;
1166   foreach my $job (keys %reader)
1167   {
1168     my $buf;
1169     while (0 < sysread ($reader{$job}, $buf, 8192))
1170     {
1171       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1172       $jobstep[$job]->{stderr} .= $buf;
1173       preprocess_stderr ($job);
1174       if (length ($jobstep[$job]->{stderr}) > 16384)
1175       {
1176         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1177       }
1178       $gotsome = 1;
1179     }
1180   }
1181   return $gotsome;
1182 }
1183
1184
1185 sub preprocess_stderr
1186 {
1187   my $job = shift;
1188
1189   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1190     my $line = $1;
1191     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1192     Log ($job, "stderr $line");
1193     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1194       # whoa.
1195       $main::please_freeze = 1;
1196     }
1197     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1198       $jobstep[$job]->{node_fail} = 1;
1199       ban_node_by_slot($jobstep[$job]->{slotindex});
1200     }
1201   }
1202 }
1203
1204
1205 sub process_stderr
1206 {
1207   my $job = shift;
1208   my $task_success = shift;
1209   preprocess_stderr ($job);
1210
1211   map {
1212     Log ($job, "stderr $_");
1213   } split ("\n", $jobstep[$job]->{stderr});
1214 }
1215
1216 sub fetch_block
1217 {
1218   my $hash = shift;
1219   my ($keep, $child_out, $output_block);
1220
1221   my $cmd = "arv-get \Q$hash\E";
1222   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1223   $output_block = '';
1224   while (1) {
1225     my $buf;
1226     my $bytes = sysread($keep, $buf, 1024 * 1024);
1227     if (!defined $bytes) {
1228       die "reading from arv-get: $!";
1229     } elsif ($bytes == 0) {
1230       # sysread returns 0 at the end of the pipe.
1231       last;
1232     } else {
1233       # some bytes were read into buf.
1234       $output_block .= $buf;
1235     }
1236   }
1237   close $keep;
1238   return $output_block;
1239 }
1240
1241 sub collate_output
1242 {
1243   Log (undef, "collate");
1244
1245   my ($child_out, $child_in);
1246   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1247                   '--retries', put_retry_count());
1248   my $joboutput;
1249   for (@jobstep)
1250   {
1251     next if (!exists $_->{'arvados_task'}->{'output'} ||
1252              !$_->{'arvados_task'}->{'success'});
1253     my $output = $_->{'arvados_task'}->{output};
1254     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1255     {
1256       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1257       print $child_in $output;
1258     }
1259     elsif (@jobstep == 1)
1260     {
1261       $joboutput = $output;
1262       last;
1263     }
1264     elsif (defined (my $outblock = fetch_block ($output)))
1265     {
1266       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1267       print $child_in $outblock;
1268     }
1269     else
1270     {
1271       Log (undef, "XXX fetch_block($output) failed XXX");
1272       $main::success = 0;
1273     }
1274   }
1275   $child_in->close;
1276
1277   if (!defined $joboutput) {
1278     my $s = IO::Select->new($child_out);
1279     if ($s->can_read(120)) {
1280       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1281       chomp($joboutput);
1282       # TODO: Ensure exit status == 0.
1283     } else {
1284       Log (undef, "timed out reading from 'arv-put'");
1285     }
1286   }
1287   # TODO: kill $pid instead of waiting, now that we've decided to
1288   # ignore further output.
1289   waitpid($pid, 0);
1290
1291   return $joboutput;
1292 }
1293
1294
1295 sub killem
1296 {
1297   foreach (@_)
1298   {
1299     my $sig = 2;                # SIGINT first
1300     if (exists $proc{$_}->{"sent_$sig"} &&
1301         time - $proc{$_}->{"sent_$sig"} > 4)
1302     {
1303       $sig = 15;                # SIGTERM if SIGINT doesn't work
1304     }
1305     if (exists $proc{$_}->{"sent_$sig"} &&
1306         time - $proc{$_}->{"sent_$sig"} > 4)
1307     {
1308       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1309     }
1310     if (!exists $proc{$_}->{"sent_$sig"})
1311     {
1312       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1313       kill $sig, $_;
1314       select (undef, undef, undef, 0.1);
1315       if ($sig == 2)
1316       {
1317         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1318       }
1319       $proc{$_}->{"sent_$sig"} = time;
1320       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1321     }
1322   }
1323 }
1324
1325
1326 sub fhbits
1327 {
1328   my($bits);
1329   for (@_) {
1330     vec($bits,fileno($_),1) = 1;
1331   }
1332   $bits;
1333 }
1334
1335
1336 # Send log output to Keep via arv-put.
1337 #
1338 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1339 # $log_pipe_pid is the pid of the arv-put subprocess.
1340 #
1341 # The only functions that should access these variables directly are:
1342 #
1343 # log_writer_start($logfilename)
1344 #     Starts an arv-put pipe, reading data on stdin and writing it to
1345 #     a $logfilename file in an output collection.
1346 #
1347 # log_writer_send($txt)
1348 #     Writes $txt to the output log collection.
1349 #
1350 # log_writer_finish()
1351 #     Closes the arv-put pipe and returns the output that it produces.
1352 #
1353 # log_writer_is_active()
1354 #     Returns a true value if there is currently a live arv-put
1355 #     process, false otherwise.
1356 #
1357 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1358
1359 sub log_writer_start($)
1360 {
1361   my $logfilename = shift;
1362   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1363                         'arv-put', '--portable-data-hash',
1364                         '--retries', '3',
1365                         '--filename', $logfilename,
1366                         '-');
1367 }
1368
1369 sub log_writer_send($)
1370 {
1371   my $txt = shift;
1372   print $log_pipe_in $txt;
1373 }
1374
1375 sub log_writer_finish()
1376 {
1377   return unless $log_pipe_pid;
1378
1379   close($log_pipe_in);
1380   my $arv_put_output;
1381
1382   my $s = IO::Select->new($log_pipe_out);
1383   if ($s->can_read(120)) {
1384     sysread($log_pipe_out, $arv_put_output, 1024);
1385     chomp($arv_put_output);
1386   } else {
1387     Log (undef, "timed out reading from 'arv-put'");
1388   }
1389
1390   waitpid($log_pipe_pid, 0);
1391   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1392   if ($?) {
1393     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1394   }
1395
1396   return $arv_put_output;
1397 }
1398
1399 sub log_writer_is_active() {
1400   return $log_pipe_pid;
1401 }
1402
1403 sub Log                         # ($jobstep_id, $logmessage)
1404 {
1405   if ($_[1] =~ /\n/) {
1406     for my $line (split (/\n/, $_[1])) {
1407       Log ($_[0], $line);
1408     }
1409     return;
1410   }
1411   my $fh = select STDERR; $|=1; select $fh;
1412   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1413   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1414   $message .= "\n";
1415   my $datetime;
1416   if (log_writer_is_active() || -t STDERR) {
1417     my @gmtime = gmtime;
1418     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1419                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1420   }
1421   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1422
1423   if (log_writer_is_active()) {
1424     log_writer_send($datetime . " " . $message);
1425   }
1426 }
1427
1428
1429 sub croak
1430 {
1431   my ($package, $file, $line) = caller;
1432   my $message = "@_ at $file line $line\n";
1433   Log (undef, $message);
1434   freeze() if @jobstep_todo;
1435   collate_output() if @jobstep_todo;
1436   cleanup() if $Job;
1437   save_meta() if log_writer_is_active();
1438   die;
1439 }
1440
1441
1442 sub cleanup
1443 {
1444   if ($Job->{'state'} eq 'Cancelled') {
1445     $Job->update_attributes('finished_at' => scalar gmtime);
1446   } else {
1447     $Job->update_attributes('state' => 'Failed');
1448   }
1449 }
1450
1451
1452 sub save_meta
1453 {
1454   my $justcheckpoint = shift; # false if this will be the last meta saved
1455   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1456
1457   my $loglocator = log_writer_finish();
1458   Log (undef, "log manifest is $loglocator");
1459   $Job->{'log'} = $loglocator;
1460   $Job->update_attributes('log', $loglocator);
1461 }
1462
1463
1464 sub freeze_if_want_freeze
1465 {
1466   if ($main::please_freeze)
1467   {
1468     release_allocation();
1469     if (@_)
1470     {
1471       # kill some srun procs before freeze+stop
1472       map { $proc{$_} = {} } @_;
1473       while (%proc)
1474       {
1475         killem (keys %proc);
1476         select (undef, undef, undef, 0.1);
1477         my $died;
1478         while (($died = waitpid (-1, WNOHANG)) > 0)
1479         {
1480           delete $proc{$died};
1481         }
1482       }
1483     }
1484     freeze();
1485     collate_output();
1486     cleanup();
1487     save_meta();
1488     exit 1;
1489   }
1490 }
1491
1492
1493 sub freeze
1494 {
1495   Log (undef, "Freeze not implemented");
1496   return;
1497 }
1498
1499
1500 sub thaw
1501 {
1502   croak ("Thaw not implemented");
1503 }
1504
1505
1506 sub freezequote
1507 {
1508   my $s = shift;
1509   $s =~ s/\\/\\\\/g;
1510   $s =~ s/\n/\\n/g;
1511   return $s;
1512 }
1513
1514
1515 sub freezeunquote
1516 {
1517   my $s = shift;
1518   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1519   return $s;
1520 }
1521
1522
1523 sub srun
1524 {
1525   my $srunargs = shift;
1526   my $execargs = shift;
1527   my $opts = shift || {};
1528   my $stdin = shift;
1529   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1530   print STDERR (join (" ",
1531                       map { / / ? "'$_'" : $_ }
1532                       (@$args)),
1533                 "\n")
1534       if $ENV{CRUNCH_DEBUG};
1535
1536   if (defined $stdin) {
1537     my $child = open STDIN, "-|";
1538     defined $child or die "no fork: $!";
1539     if ($child == 0) {
1540       print $stdin or die $!;
1541       close STDOUT or die $!;
1542       exit 0;
1543     }
1544   }
1545
1546   return system (@$args) if $opts->{fork};
1547
1548   exec @$args;
1549   warn "ENV size is ".length(join(" ",%ENV));
1550   die "exec failed: $!: @$args";
1551 }
1552
1553
1554 sub ban_node_by_slot {
1555   # Don't start any new jobsteps on this node for 60 seconds
1556   my $slotid = shift;
1557   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1558   $slot[$slotid]->{node}->{hold_count}++;
1559   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1560 }
1561
1562 sub must_lock_now
1563 {
1564   my ($lockfile, $error_message) = @_;
1565   open L, ">", $lockfile or croak("$lockfile: $!");
1566   if (!flock L, LOCK_EX|LOCK_NB) {
1567     croak("Can't lock $lockfile: $error_message\n");
1568   }
1569 }
1570
1571 sub find_docker_image {
1572   # Given a Keep locator, check to see if it contains a Docker image.
1573   # If so, return its stream name and Docker hash.
1574   # If not, return undef for both values.
1575   my $locator = shift;
1576   my ($streamname, $filename);
1577   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1578     foreach my $line (split(/\n/, $image->{manifest_text})) {
1579       my @tokens = split(/\s+/, $line);
1580       next if (!@tokens);
1581       $streamname = shift(@tokens);
1582       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1583         if (defined($filename)) {
1584           return (undef, undef);  # More than one file in the Collection.
1585         } else {
1586           $filename = (split(/:/, $filedata, 3))[2];
1587         }
1588       }
1589     }
1590   }
1591   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1592     return ($streamname, $1);
1593   } else {
1594     return (undef, undef);
1595   }
1596 }
1597
1598 sub put_retry_count {
1599   # Calculate a --retries argument for arv-put that will have it try
1600   # approximately as long as this Job has been running.
1601   my $stoptime = shift || time;
1602   my $starttime = $jobstep[0]->{starttime};
1603   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1604   my $retries = 0;
1605   while ($timediff >= 2) {
1606     $retries++;
1607     $timediff /= 2;
1608   }
1609   return ($retries > 3) ? $retries : 3;
1610 }
1611
1612 sub exit_status_s {
1613   # Given a $?, return a human-readable exit code string like "0" or
1614   # "1" or "0 with signal 1" or "1 with signal 11".
1615   my $exitcode = shift;
1616   my $s = $exitcode >> 8;
1617   if ($exitcode & 0x7f) {
1618     $s .= " with signal " . ($exitcode & 0x7f);
1619   }
1620   if ($exitcode & 0x80) {
1621     $s .= " with core dump";
1622   }
1623   return $s;
1624 }
1625
1626 __DATA__
1627 #!/usr/bin/perl
1628
1629 # checkout-and-build
1630
1631 use Fcntl ':flock';
1632 use File::Path qw( make_path );
1633
1634 my $destdir = $ENV{"CRUNCH_SRC"};
1635 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1636 my $repo = $ENV{"CRUNCH_SRC_URL"};
1637 my $task_work = $ENV{"TASK_WORK"};
1638
1639 for my $dir ($destdir, $task_work) {
1640     if ($dir) {
1641         make_path $dir;
1642         -e $dir or die "Failed to create temporary directory ($dir): $!";
1643     }
1644 }
1645
1646 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1647 flock L, LOCK_EX;
1648 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1649     if (@ARGV) {
1650         exec(@ARGV);
1651         die "Cannot exec `@ARGV`: $!";
1652     } else {
1653         exit 0;
1654     }
1655 }
1656
1657 unlink "$destdir.commit";
1658 open STDOUT, ">", "$destdir.log";
1659 open STDERR, ">&STDOUT";
1660
1661 mkdir $destdir;
1662 my @git_archive_data = <DATA>;
1663 if (@git_archive_data) {
1664   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1665   print TARX @git_archive_data;
1666   if(!close(TARX)) {
1667     die "'tar -C $destdir -xf -' exited $?: $!";
1668   }
1669 }
1670
1671 my $pwd;
1672 chomp ($pwd = `pwd`);
1673 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1674 mkdir $install_dir;
1675
1676 for my $src_path ("$destdir/arvados/sdk/python") {
1677   if (-d $src_path) {
1678     shell_or_die ("virtualenv", $install_dir);
1679     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1680   }
1681 }
1682
1683 if (-e "$destdir/crunch_scripts/install") {
1684     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1685 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1686     # Old version
1687     shell_or_die ("./tests/autotests.sh", $install_dir);
1688 } elsif (-e "./install.sh") {
1689     shell_or_die ("./install.sh", $install_dir);
1690 }
1691
1692 if ($commit) {
1693     unlink "$destdir.commit.new";
1694     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1695     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1696 }
1697
1698 close L;
1699
1700 if (@ARGV) {
1701     exec(@ARGV);
1702     die "Cannot exec `@ARGV`: $!";
1703 } else {
1704     exit 0;
1705 }
1706
1707 sub shell_or_die
1708 {
1709   if ($ENV{"DEBUG"}) {
1710     print STDERR "@_\n";
1711   }
1712   system (@_) == 0
1713       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1714 }
1715
1716 __DATA__