Added arv-mount
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
89
90 my $force_unlock;
91 my $git_dir;
92 my $jobspec;
93 my $job_api_token;
94 my $resume_stash;
95 GetOptions('force-unlock' => \$force_unlock,
96            'git-dir=s' => \$git_dir,
97            'job=s' => \$jobspec,
98            'job-api-token=s' => \$job_api_token,
99            'resume-stash=s' => \$resume_stash,
100     );
101
102 if (defined $job_api_token) {
103   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
104 }
105
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
109
110
111 $SIG{'USR1'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 1;
114 };
115 $SIG{'USR2'} = sub
116 {
117   $main::ENV{CRUNCH_DEBUG} = 0;
118 };
119
120
121
122 my $arv = Arvados->new('apiVersion' => 'v1');
123 my $metastream;
124
125 my $User = $arv->{'users'}->{'current'}->execute;
126
127 my $Job = {};
128 my $job_id;
129 my $dbh;
130 my $sth;
131 if ($job_has_uuid)
132 {
133   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
134   if (!$force_unlock) {
135     if ($Job->{'is_locked_by_uuid'}) {
136       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
137     }
138     if ($Job->{'success'} ne undef) {
139       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
140     }
141     if ($Job->{'running'}) {
142       croak("Job 'running' flag is already set");
143     }
144     if ($Job->{'started_at'}) {
145       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
146     }
147   }
148 }
149 else
150 {
151   $Job = JSON::decode_json($jobspec);
152
153   if (!$resume_stash)
154   {
155     map { croak ("No $_ specified") unless $Job->{$_} }
156     qw(script script_version script_parameters);
157   }
158
159   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
160   $Job->{'started_at'} = gmtime;
161
162   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
163
164   $job_has_uuid = 1;
165 }
166 $job_id = $Job->{'uuid'};
167
168 $metastream = Warehouse::Stream->new(whc => new Warehouse);
169 $metastream->clear;
170 $metastream->name('.');
171 $metastream->write_start($job_id . '.log.txt');
172
173
174 $Job->{'runtime_constraints'} ||= {};
175 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
176 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
177
178
179 Log (undef, "check slurm allocation");
180 my @slot;
181 my @node;
182 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
183 my @sinfo;
184 if (!$have_slurm)
185 {
186   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
187   push @sinfo, "$localcpus localhost";
188 }
189 if (exists $ENV{SLURM_NODELIST})
190 {
191   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
192 }
193 foreach (@sinfo)
194 {
195   my ($ncpus, $slurm_nodelist) = split;
196   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
197
198   my @nodelist;
199   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
200   {
201     my $nodelist = $1;
202     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
203     {
204       my $ranges = $1;
205       foreach (split (",", $ranges))
206       {
207         my ($a, $b);
208         if (/(\d+)-(\d+)/)
209         {
210           $a = $1;
211           $b = $2;
212         }
213         else
214         {
215           $a = $_;
216           $b = $_;
217         }
218         push @nodelist, map {
219           my $n = $nodelist;
220           $n =~ s/\[[-,\d]+\]/$_/;
221           $n;
222         } ($a..$b);
223       }
224     }
225     else
226     {
227       push @nodelist, $nodelist;
228     }
229   }
230   foreach my $nodename (@nodelist)
231   {
232     Log (undef, "node $nodename - $ncpus slots");
233     my $node = { name => $nodename,
234                  ncpus => $ncpus,
235                  losing_streak => 0,
236                  hold_until => 0 };
237     foreach my $cpu (1..$ncpus)
238     {
239       push @slot, { node => $node,
240                     cpu => $cpu };
241     }
242   }
243   push @node, @nodelist;
244 }
245
246
247
248 # Ensure that we get one jobstep running on each allocated node before
249 # we start overloading nodes with concurrent steps
250
251 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
252
253
254
255 my $jobmanager_id;
256 if ($job_has_uuid)
257 {
258   # Claim this job, and make sure nobody else does
259   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
260           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
261     croak("Error while updating / locking job");
262   }
263   $Job->update_attributes('started_at' => scalar gmtime,
264                           'running' => 1,
265                           'success' => undef,
266                           'tasks_summary' => { 'failed' => 0,
267                                                'todo' => 1,
268                                                'running' => 0,
269                                                'done' => 0 });
270 }
271
272
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
281
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
287
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
291
292
293 my @jobstep;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
298 my $squeue_checked;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
302
303
304
305 if (defined $Job->{thawedfromkey})
306 {
307   thaw ($Job->{thawedfromkey});
308 }
309 else
310 {
311   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312     'job_uuid' => $Job->{'uuid'},
313     'sequence' => 0,
314     'qsequence' => 0,
315     'parameters' => {},
316                                                           });
317   push @jobstep, { 'level' => 0,
318                    'failures' => 0,
319                    'arvados_task' => $first_task,
320                  };
321   push @jobstep_todo, 0;
322 }
323
324
325 my $build_script;
326
327
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
329
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 if ($skip_install)
332 {
333   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
334 }
335 else
336 {
337   do {
338     local $/ = undef;
339     $build_script = <DATA>;
340   };
341   Log (undef, "Install revision ".$Job->{script_version});
342   my $nodelist = join(",", @node);
343
344   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
345
346   my $cleanpid = fork();
347   if ($cleanpid == 0)
348   {
349     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
351     exit (1);
352   }
353   while (1)
354   {
355     last if $cleanpid == waitpid (-1, WNOHANG);
356     freeze_if_want_freeze ($cleanpid);
357     select (undef, undef, undef, 0.1);
358   }
359   Log (undef, "Clean-work-dir exited $?");
360
361   # Install requested code version
362
363   my @execargs;
364   my @srunargs = ("srun",
365                   "--nodelist=$nodelist",
366                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
367
368   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
371
372   my $commit;
373   my $git_archive;
374   my $treeish = $Job->{'script_version'};
375   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376   # Todo: let script_version specify repository instead of expecting
377   # parent process to figure it out.
378   $ENV{"CRUNCH_SRC_URL"} = $repo;
379
380   # Create/update our clone of the remote git repo
381
382   if (!-d $ENV{"CRUNCH_SRC"}) {
383     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384         or croak ("git clone $repo failed: exit ".($?>>8));
385     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
386   }
387   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
388
389   # If this looks like a subversion r#, look for it in git-svn commit messages
390
391   if ($treeish =~ m{^\d{1,4}$}) {
392     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
393     chomp $gitlog;
394     if ($gitlog =~ /^[a-f0-9]{40}$/) {
395       $commit = $gitlog;
396       Log (undef, "Using commit $commit for script_version $treeish");
397     }
398   }
399
400   # If that didn't work, try asking git to look it up as a tree-ish.
401
402   if (!defined $commit) {
403
404     my $cooked_treeish = $treeish;
405     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406       # Looks like a git branch name -- make sure git knows it's
407       # relative to the remote repo
408       $cooked_treeish = "origin/$treeish";
409     }
410
411     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
412     chomp $found;
413     if ($found =~ /^[0-9a-f]{40}$/s) {
414       $commit = $found;
415       if ($commit ne $treeish) {
416         # Make sure we record the real commit id in the database,
417         # frozentokey, logs, etc. -- instead of an abbreviation or a
418         # branch name which can become ambiguous or point to a
419         # different commit in the future.
420         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421         Log (undef, "Using commit $commit for tree-ish $treeish");
422         if ($commit ne $treeish) {
423           $Job->{'script_version'} = $commit;
424           !$job_has_uuid or
425               $Job->update_attributes('script_version' => $commit) or
426               croak("Error while updating job");
427         }
428       }
429     }
430   }
431
432   if (defined $commit) {
433     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
434     @execargs = ("sh", "-c",
435                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
436     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
437   }
438   else {
439     croak ("could not figure out commit id for $treeish");
440   }
441
442   my $installpid = fork();
443   if ($installpid == 0)
444   {
445     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
446     exit (1);
447   }
448   while (1)
449   {
450     last if $installpid == waitpid (-1, WNOHANG);
451     freeze_if_want_freeze ($installpid);
452     select (undef, undef, undef, 0.1);
453   }
454   Log (undef, "Install exited $?");
455 }
456
457
458
459 foreach (qw (script script_version script_parameters runtime_constraints))
460 {
461   Log (undef,
462        "$_ " .
463        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
464 }
465 foreach (split (/\n/, $Job->{knobs}))
466 {
467   Log (undef, "knob " . $_);
468 }
469
470
471
472 $main::success = undef;
473
474
475
476 ONELEVEL:
477
478 my $thisround_succeeded = 0;
479 my $thisround_failed = 0;
480 my $thisround_failed_multiple = 0;
481
482 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
483                        or $a <=> $b } @jobstep_todo;
484 my $level = $jobstep[$jobstep_todo[0]]->{level};
485 Log (undef, "start level $level");
486
487
488
489 my %proc;
490 my @freeslot = (0..$#slot);
491 my @holdslot;
492 my %reader;
493 my $progress_is_dirty = 1;
494 my $progress_stats_updated = 0;
495
496 update_progress_stats();
497
498
499
500 THISROUND:
501 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
502 {
503   my $id = $jobstep_todo[$todo_ptr];
504   my $Jobstep = $jobstep[$id];
505   if ($Jobstep->{level} != $level)
506   {
507     next;
508   }
509
510   pipe $reader{$id}, "writer" or croak ($!);
511   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
512   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
513
514   my $childslot = $freeslot[0];
515   my $childnode = $slot[$childslot]->{node};
516   my $childslotname = join (".",
517                             $slot[$childslot]->{node}->{name},
518                             $slot[$childslot]->{cpu});
519   my $childpid = fork();
520   if ($childpid == 0)
521   {
522     $SIG{'INT'} = 'DEFAULT';
523     $SIG{'QUIT'} = 'DEFAULT';
524     $SIG{'TERM'} = 'DEFAULT';
525
526     foreach (values (%reader))
527     {
528       close($_);
529     }
530     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
531     open(STDOUT,">&writer");
532     open(STDERR,">&writer");
533
534     undef $dbh;
535     undef $sth;
536
537     delete $ENV{"GNUPGHOME"};
538     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
539     $ENV{"TASK_QSEQUENCE"} = $id;
540     $ENV{"TASK_SEQUENCE"} = $level;
541     $ENV{"JOB_SCRIPT"} = $Job->{script};
542     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
543       $param =~ tr/a-z/A-Z/;
544       $ENV{"JOB_PARAMETER_$param"} = $value;
545     }
546     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
547     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
548     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
549     $ENV{"TASK_KEEPMOUNT"} = $ENV{"JOB_WORK"}."/keep";
550     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
551     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
552
553     $ENV{"GZIP"} = "-n";
554
555     my @srunargs = (
556       "srun",
557       "--nodelist=".$childnode->{name},
558       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
559       "--job-name=$job_id.$id.$$",
560         );
561     my @execargs = qw(sh);
562     my $build_script_to_send = "";
563     my $command =
564         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
565         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
566         ."&& cd $ENV{CRUNCH_TMP} ";
567     if ($build_script)
568     {
569       $build_script_to_send = $build_script;
570       $command .=
571           "&& perl -";
572     }
573     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
574     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
575     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
576     $command .=
577         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
578     my @execargs = ('bash', '-c', $command);
579     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
580     exit (111);
581   }
582   close("writer");
583   if (!defined $childpid)
584   {
585     close $reader{$id};
586     delete $reader{$id};
587     next;
588   }
589   shift @freeslot;
590   $proc{$childpid} = { jobstep => $id,
591                        time => time,
592                        slot => $childslot,
593                        jobstepname => "$job_id.$id.$childpid",
594                      };
595   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
596   $slot[$childslot]->{pid} = $childpid;
597
598   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
599   Log ($id, "child $childpid started on $childslotname");
600   $Jobstep->{starttime} = time;
601   $Jobstep->{node} = $childnode->{name};
602   $Jobstep->{slotindex} = $childslot;
603   delete $Jobstep->{stderr};
604   delete $Jobstep->{finishtime};
605
606   splice @jobstep_todo, $todo_ptr, 1;
607   --$todo_ptr;
608
609   $progress_is_dirty = 1;
610
611   while (!@freeslot
612          ||
613          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
614   {
615     last THISROUND if $main::please_freeze;
616     if ($main::please_info)
617     {
618       $main::please_info = 0;
619       freeze();
620       collate_output();
621       save_meta(1);
622       update_progress_stats();
623     }
624     my $gotsome
625         = readfrompipes ()
626         + reapchildren ();
627     if (!$gotsome)
628     {
629       check_refresh_wanted();
630       check_squeue();
631       update_progress_stats();
632       select (undef, undef, undef, 0.1);
633     }
634     elsif (time - $progress_stats_updated >= 30)
635     {
636       update_progress_stats();
637     }
638     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
639         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
640     {
641       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
642           .($thisround_failed+$thisround_succeeded)
643           .") -- giving up on this round";
644       Log (undef, $message);
645       last THISROUND;
646     }
647
648     # move slots from freeslot to holdslot (or back to freeslot) if necessary
649     for (my $i=$#freeslot; $i>=0; $i--) {
650       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
651         push @holdslot, (splice @freeslot, $i, 1);
652       }
653     }
654     for (my $i=$#holdslot; $i>=0; $i--) {
655       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
656         push @freeslot, (splice @holdslot, $i, 1);
657       }
658     }
659
660     # give up if no nodes are succeeding
661     if (!grep { $_->{node}->{losing_streak} == 0 &&
662                     $_->{node}->{hold_count} < 4 } @slot) {
663       my $message = "Every node has failed -- giving up on this round";
664       Log (undef, $message);
665       last THISROUND;
666     }
667   }
668 }
669
670
671 push @freeslot, splice @holdslot;
672 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
673
674
675 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
676 while (%proc)
677 {
678   if ($main::please_continue) {
679     $main::please_continue = 0;
680     goto THISROUND;
681   }
682   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
683   readfrompipes ();
684   if (!reapchildren())
685   {
686     check_refresh_wanted();
687     check_squeue();
688     update_progress_stats();
689     select (undef, undef, undef, 0.1);
690     killem (keys %proc) if $main::please_freeze;
691   }
692 }
693
694 update_progress_stats();
695 freeze_if_want_freeze();
696
697
698 if (!defined $main::success)
699 {
700   if (@jobstep_todo &&
701       $thisround_succeeded == 0 &&
702       ($thisround_failed == 0 || $thisround_failed > 4))
703   {
704     my $message = "stop because $thisround_failed tasks failed and none succeeded";
705     Log (undef, $message);
706     $main::success = 0;
707   }
708   if (!@jobstep_todo)
709   {
710     $main::success = 1;
711   }
712 }
713
714 goto ONELEVEL if !defined $main::success;
715
716
717 release_allocation();
718 freeze();
719 if ($job_has_uuid) {
720   $Job->update_attributes('output' => &collate_output(),
721                           'running' => 0,
722                           'success' => $Job->{'output'} && $main::success,
723                           'finished_at' => scalar gmtime)
724 }
725
726 if ($Job->{'output'})
727 {
728   eval {
729     my $manifest_text = capturex("whget", $Job->{'output'});
730     $arv->{'collections'}->{'create'}->execute('collection' => {
731       'uuid' => $Job->{'output'},
732       'manifest_text' => $manifest_text,
733     });
734   };
735   if ($@) {
736     Log (undef, "Failed to register output manifest: $@");
737   }
738 }
739
740 Log (undef, "finish");
741
742 save_meta();
743 exit 0;
744
745
746
747 sub update_progress_stats
748 {
749   $progress_stats_updated = time;
750   return if !$progress_is_dirty;
751   my ($todo, $done, $running) = (scalar @jobstep_todo,
752                                  scalar @jobstep_done,
753                                  scalar @slot - scalar @freeslot - scalar @holdslot);
754   $Job->{'tasks_summary'} ||= {};
755   $Job->{'tasks_summary'}->{'todo'} = $todo;
756   $Job->{'tasks_summary'}->{'done'} = $done;
757   $Job->{'tasks_summary'}->{'running'} = $running;
758   if ($job_has_uuid) {
759     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
760   }
761   Log (undef, "status: $done done, $running running, $todo todo");
762   $progress_is_dirty = 0;
763 }
764
765
766
767 sub reapchildren
768 {
769   my $pid = waitpid (-1, WNOHANG);
770   return 0 if $pid <= 0;
771
772   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
773                   . "."
774                   . $slot[$proc{$pid}->{slot}]->{cpu});
775   my $jobstepid = $proc{$pid}->{jobstep};
776   my $elapsed = time - $proc{$pid}->{time};
777   my $Jobstep = $jobstep[$jobstepid];
778
779   my $childstatus = $?;
780   my $exitvalue = $childstatus >> 8;
781   my $exitinfo = sprintf("exit %d signal %d%s",
782                          $exitvalue,
783                          $childstatus & 127,
784                          ($childstatus & 128 ? ' core dump' : ''));
785   $Jobstep->{'arvados_task'}->reload;
786   my $task_success = $Jobstep->{'arvados_task'}->{success};
787
788   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
789
790   if (!defined $task_success) {
791     # task did not indicate one way or the other --> fail
792     $Jobstep->{'arvados_task'}->{success} = 0;
793     $Jobstep->{'arvados_task'}->save;
794     $task_success = 0;
795   }
796
797   if (!$task_success)
798   {
799     my $temporary_fail;
800     $temporary_fail ||= $Jobstep->{node_fail};
801     $temporary_fail ||= ($exitvalue == 111);
802
803     ++$thisround_failed;
804     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
805
806     # Check for signs of a failed or misconfigured node
807     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
808         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
809       # Don't count this against jobstep failure thresholds if this
810       # node is already suspected faulty and srun exited quickly
811       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
812           $elapsed < 5) {
813         Log ($jobstepid, "blaming failure on suspect node " .
814              $slot[$proc{$pid}->{slot}]->{node}->{name});
815         $temporary_fail ||= 1;
816       }
817       ban_node_by_slot($proc{$pid}->{slot});
818     }
819
820     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
821                              ++$Jobstep->{'failures'},
822                              $temporary_fail ? 'temporary ' : 'permanent',
823                              $elapsed));
824
825     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
826       # Give up on this task, and the whole job
827       $main::success = 0;
828       $main::please_freeze = 1;
829     }
830     else {
831       # Put this task back on the todo queue
832       push @jobstep_todo, $jobstepid;
833     }
834     $Job->{'tasks_summary'}->{'failed'}++;
835   }
836   else
837   {
838     ++$thisround_succeeded;
839     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
840     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
841     push @jobstep_done, $jobstepid;
842     Log ($jobstepid, "success in $elapsed seconds");
843   }
844   $Jobstep->{exitcode} = $childstatus;
845   $Jobstep->{finishtime} = time;
846   process_stderr ($jobstepid, $task_success);
847   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
848
849   close $reader{$jobstepid};
850   delete $reader{$jobstepid};
851   delete $slot[$proc{$pid}->{slot}]->{pid};
852   push @freeslot, $proc{$pid}->{slot};
853   delete $proc{$pid};
854
855   # Load new tasks
856   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
857     'where' => {
858       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
859     },
860     'order' => 'qsequence'
861   );
862   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
863     my $jobstep = {
864       'level' => $arvados_task->{'sequence'},
865       'failures' => 0,
866       'arvados_task' => $arvados_task
867     };
868     push @jobstep, $jobstep;
869     push @jobstep_todo, $#jobstep;
870   }
871
872   $progress_is_dirty = 1;
873   1;
874 }
875
876 sub check_refresh_wanted
877 {
878   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
879   if (@stat && $stat[9] > $latest_refresh) {
880     $latest_refresh = scalar time;
881     if ($job_has_uuid) {
882       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
883       for my $attr ('cancelled_at',
884                     'cancelled_by_user_uuid',
885                     'cancelled_by_client_uuid') {
886         $Job->{$attr} = $Job2->{$attr};
887       }
888       if ($Job->{'cancelled_at'}) {
889         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
890              " by user " . $Job->{cancelled_by_user_uuid});
891         $main::success = 0;
892         $main::please_freeze = 1;
893       }
894     }
895   }
896 }
897
898 sub check_squeue
899 {
900   # return if the kill list was checked <4 seconds ago
901   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
902   {
903     return;
904   }
905   $squeue_kill_checked = time;
906
907   # use killem() on procs whose killtime is reached
908   for (keys %proc)
909   {
910     if (exists $proc{$_}->{killtime}
911         && $proc{$_}->{killtime} <= time)
912     {
913       killem ($_);
914     }
915   }
916
917   # return if the squeue was checked <60 seconds ago
918   if (defined $squeue_checked && $squeue_checked > time - 60)
919   {
920     return;
921   }
922   $squeue_checked = time;
923
924   if (!$have_slurm)
925   {
926     # here is an opportunity to check for mysterious problems with local procs
927     return;
928   }
929
930   # get a list of steps still running
931   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
932   chop @squeue;
933   if ($squeue[-1] ne "ok")
934   {
935     return;
936   }
937   pop @squeue;
938
939   # which of my jobsteps are running, according to squeue?
940   my %ok;
941   foreach (@squeue)
942   {
943     if (/^(\d+)\.(\d+) (\S+)/)
944     {
945       if ($1 eq $ENV{SLURM_JOBID})
946       {
947         $ok{$3} = 1;
948       }
949     }
950   }
951
952   # which of my active child procs (>60s old) were not mentioned by squeue?
953   foreach (keys %proc)
954   {
955     if ($proc{$_}->{time} < time - 60
956         && !exists $ok{$proc{$_}->{jobstepname}}
957         && !exists $proc{$_}->{killtime})
958     {
959       # kill this proc if it hasn't exited in 30 seconds
960       $proc{$_}->{killtime} = time + 30;
961     }
962   }
963 }
964
965
966 sub release_allocation
967 {
968   if ($have_slurm)
969   {
970     Log (undef, "release job allocation");
971     system "scancel $ENV{SLURM_JOBID}";
972   }
973 }
974
975
976 sub readfrompipes
977 {
978   my $gotsome = 0;
979   foreach my $job (keys %reader)
980   {
981     my $buf;
982     while (0 < sysread ($reader{$job}, $buf, 8192))
983     {
984       print STDERR $buf if $ENV{CRUNCH_DEBUG};
985       $jobstep[$job]->{stderr} .= $buf;
986       preprocess_stderr ($job);
987       if (length ($jobstep[$job]->{stderr}) > 16384)
988       {
989         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
990       }
991       $gotsome = 1;
992     }
993   }
994   return $gotsome;
995 }
996
997
998 sub preprocess_stderr
999 {
1000   my $job = shift;
1001
1002   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1003     my $line = $1;
1004     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1005     Log ($job, "stderr $line");
1006     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1007       # whoa.
1008       $main::please_freeze = 1;
1009     }
1010     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1011       $jobstep[$job]->{node_fail} = 1;
1012       ban_node_by_slot($jobstep[$job]->{slotindex});
1013     }
1014   }
1015 }
1016
1017
1018 sub process_stderr
1019 {
1020   my $job = shift;
1021   my $task_success = shift;
1022   preprocess_stderr ($job);
1023
1024   map {
1025     Log ($job, "stderr $_");
1026   } split ("\n", $jobstep[$job]->{stderr});
1027 }
1028
1029
1030 sub collate_output
1031 {
1032   my $whc = Warehouse->new;
1033   Log (undef, "collate");
1034   $whc->write_start (1);
1035   my $joboutput;
1036   for (@jobstep)
1037   {
1038     next if (!exists $_->{'arvados_task'}->{output} ||
1039              !$_->{'arvados_task'}->{'success'} ||
1040              $_->{'exitcode'} != 0);
1041     my $output = $_->{'arvados_task'}->{output};
1042     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1043     {
1044       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1045       $whc->write_data ($output);
1046     }
1047     elsif (@jobstep == 1)
1048     {
1049       $joboutput = $output;
1050       $whc->write_finish;
1051     }
1052     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1053     {
1054       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1055       $whc->write_data ($outblock);
1056     }
1057     else
1058     {
1059       my $errstr = $whc->errstr;
1060       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1061       $main::success = 0;
1062     }
1063   }
1064   $joboutput = $whc->write_finish if !defined $joboutput;
1065   if ($joboutput)
1066   {
1067     Log (undef, "output $joboutput");
1068     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1069   }
1070   else
1071   {
1072     Log (undef, "output undef");
1073   }
1074   return $joboutput;
1075 }
1076
1077
1078 sub killem
1079 {
1080   foreach (@_)
1081   {
1082     my $sig = 2;                # SIGINT first
1083     if (exists $proc{$_}->{"sent_$sig"} &&
1084         time - $proc{$_}->{"sent_$sig"} > 4)
1085     {
1086       $sig = 15;                # SIGTERM if SIGINT doesn't work
1087     }
1088     if (exists $proc{$_}->{"sent_$sig"} &&
1089         time - $proc{$_}->{"sent_$sig"} > 4)
1090     {
1091       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1092     }
1093     if (!exists $proc{$_}->{"sent_$sig"})
1094     {
1095       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1096       kill $sig, $_;
1097       select (undef, undef, undef, 0.1);
1098       if ($sig == 2)
1099       {
1100         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1101       }
1102       $proc{$_}->{"sent_$sig"} = time;
1103       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1104     }
1105   }
1106 }
1107
1108
1109 sub fhbits
1110 {
1111   my($bits);
1112   for (@_) {
1113     vec($bits,fileno($_),1) = 1;
1114   }
1115   $bits;
1116 }
1117
1118
1119 sub Log                         # ($jobstep_id, $logmessage)
1120 {
1121   if ($_[1] =~ /\n/) {
1122     for my $line (split (/\n/, $_[1])) {
1123       Log ($_[0], $line);
1124     }
1125     return;
1126   }
1127   my $fh = select STDERR; $|=1; select $fh;
1128   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1129   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1130   $message .= "\n";
1131   my $datetime;
1132   if ($metastream || -t STDERR) {
1133     my @gmtime = gmtime;
1134     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1135                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1136   }
1137   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1138
1139   return if !$metastream;
1140   $metastream->write_data ($datetime . " " . $message);
1141 }
1142
1143
1144 sub croak
1145 {
1146   my ($package, $file, $line) = caller;
1147   my $message = "@_ at $file line $line\n";
1148   Log (undef, $message);
1149   freeze() if @jobstep_todo;
1150   collate_output() if @jobstep_todo;
1151   cleanup();
1152   save_meta() if $metastream;
1153   die;
1154 }
1155
1156
1157 sub cleanup
1158 {
1159   return if !$job_has_uuid;
1160   $Job->update_attributes('running' => 0,
1161                           'success' => 0,
1162                           'finished_at' => scalar gmtime);
1163 }
1164
1165
1166 sub save_meta
1167 {
1168   my $justcheckpoint = shift; # false if this will be the last meta saved
1169   my $m = $metastream;
1170   $m = $m->copy if $justcheckpoint;
1171   $m->write_finish;
1172   my $whc = Warehouse->new;
1173   my $loglocator = $whc->store_block ($m->as_string);
1174   $arv->{'collections'}->{'create'}->execute('collection' => {
1175     'uuid' => $loglocator,
1176     'manifest_text' => $m->as_string,
1177   });
1178   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1179   Log (undef, "log manifest is $loglocator");
1180   $Job->{'log'} = $loglocator;
1181   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1182 }
1183
1184
1185 sub freeze_if_want_freeze
1186 {
1187   if ($main::please_freeze)
1188   {
1189     release_allocation();
1190     if (@_)
1191     {
1192       # kill some srun procs before freeze+stop
1193       map { $proc{$_} = {} } @_;
1194       while (%proc)
1195       {
1196         killem (keys %proc);
1197         select (undef, undef, undef, 0.1);
1198         my $died;
1199         while (($died = waitpid (-1, WNOHANG)) > 0)
1200         {
1201           delete $proc{$died};
1202         }
1203       }
1204     }
1205     freeze();
1206     collate_output();
1207     cleanup();
1208     save_meta();
1209     exit 0;
1210   }
1211 }
1212
1213
1214 sub freeze
1215 {
1216   Log (undef, "Freeze not implemented");
1217   return;
1218 }
1219
1220
1221 sub thaw
1222 {
1223   croak ("Thaw not implemented");
1224
1225   my $whc;
1226   my $key = shift;
1227   Log (undef, "thaw from $key");
1228
1229   @jobstep = ();
1230   @jobstep_done = ();
1231   @jobstep_todo = ();
1232   @jobstep_tomerge = ();
1233   $jobstep_tomerge_level = 0;
1234   my $frozenjob = {};
1235
1236   my $stream = new Warehouse::Stream ( whc => $whc,
1237                                        hash => [split (",", $key)] );
1238   $stream->rewind;
1239   while (my $dataref = $stream->read_until (undef, "\n\n"))
1240   {
1241     if ($$dataref =~ /^job /)
1242     {
1243       foreach (split ("\n", $$dataref))
1244       {
1245         my ($k, $v) = split ("=", $_, 2);
1246         $frozenjob->{$k} = freezeunquote ($v);
1247       }
1248       next;
1249     }
1250
1251     if ($$dataref =~ /^merge (\d+) (.*)/)
1252     {
1253       $jobstep_tomerge_level = $1;
1254       @jobstep_tomerge
1255           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1256       next;
1257     }
1258
1259     my $Jobstep = { };
1260     foreach (split ("\n", $$dataref))
1261     {
1262       my ($k, $v) = split ("=", $_, 2);
1263       $Jobstep->{$k} = freezeunquote ($v) if $k;
1264     }
1265     $Jobstep->{'failures'} = 0;
1266     push @jobstep, $Jobstep;
1267
1268     if ($Jobstep->{exitcode} eq "0")
1269     {
1270       push @jobstep_done, $#jobstep;
1271     }
1272     else
1273     {
1274       push @jobstep_todo, $#jobstep;
1275     }
1276   }
1277
1278   foreach (qw (script script_version script_parameters))
1279   {
1280     $Job->{$_} = $frozenjob->{$_};
1281   }
1282   $Job->save if $job_has_uuid;
1283 }
1284
1285
1286 sub freezequote
1287 {
1288   my $s = shift;
1289   $s =~ s/\\/\\\\/g;
1290   $s =~ s/\n/\\n/g;
1291   return $s;
1292 }
1293
1294
1295 sub freezeunquote
1296 {
1297   my $s = shift;
1298   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1299   return $s;
1300 }
1301
1302
1303 sub srun
1304 {
1305   my $srunargs = shift;
1306   my $execargs = shift;
1307   my $opts = shift || {};
1308   my $stdin = shift;
1309   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1310   print STDERR (join (" ",
1311                       map { / / ? "'$_'" : $_ }
1312                       (@$args)),
1313                 "\n")
1314       if $ENV{CRUNCH_DEBUG};
1315
1316   if (defined $stdin) {
1317     my $child = open STDIN, "-|";
1318     defined $child or die "no fork: $!";
1319     if ($child == 0) {
1320       print $stdin or die $!;
1321       close STDOUT or die $!;
1322       exit 0;
1323     }
1324   }
1325
1326   return system (@$args) if $opts->{fork};
1327
1328   exec @$args;
1329   warn "ENV size is ".length(join(" ",%ENV));
1330   die "exec failed: $!: @$args";
1331 }
1332
1333
1334 sub ban_node_by_slot {
1335   # Don't start any new jobsteps on this node for 60 seconds
1336   my $slotid = shift;
1337   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1338   $slot[$slotid]->{node}->{hold_count}++;
1339   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1340 }
1341
1342 __DATA__
1343 #!/usr/bin/perl
1344
1345 # checkout-and-build
1346
1347 use Fcntl ':flock';
1348
1349 my $destdir = $ENV{"CRUNCH_SRC"};
1350 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1351 my $repo = $ENV{"CRUNCH_SRC_URL"};
1352
1353 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1354 flock L, LOCK_EX;
1355 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1356     exit 0;
1357 }
1358
1359 unlink "$destdir.commit";
1360 open STDOUT, ">", "$destdir.log";
1361 open STDERR, ">&STDOUT";
1362
1363 mkdir $destdir;
1364 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1365 print TARX <DATA>;
1366 if(!close(TARX)) {
1367   die "'tar -C $destdir -xf -' exited $?: $!";
1368 }
1369
1370 my $pwd;
1371 chomp ($pwd = `pwd`);
1372 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1373 mkdir $install_dir;
1374 if (-e "$destdir/crunch_scripts/install") {
1375     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1376 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1377     # Old version
1378     shell_or_die ("./tests/autotests.sh", $install_dir);
1379 } elsif (-e "./install.sh") {
1380     shell_or_die ("./install.sh", $install_dir);
1381 }
1382
1383 if ($commit) {
1384     unlink "$destdir.commit.new";
1385     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1386     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1387 }
1388
1389 close L;
1390
1391 exit 0;
1392
1393 sub shell_or_die
1394 {
1395   if ($ENV{"DEBUG"}) {
1396     print STDERR "@_\n";
1397   }
1398   system (@_) == 0
1399       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1400 }
1401
1402 __DATA__