Use --no-clear-tmp flag to decide whether to remove of old temporary
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use Warehouse;
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
83
84 $ENV{"TMPDIR"} ||= "/tmp";
85 unless (defined $ENV{"CRUNCH_TMP"}) {
86   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
87   if ($ENV{"USER"} ne "crunch" && $< != 0) {
88     # use a tmp dir unique for my uid
89     $ENV{"CRUNCH_TMP"} .= "-$<";
90   }
91 }
92 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
93 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
94 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
95 mkdir ($ENV{"JOB_WORK"});
96
97 my $force_unlock;
98 my $git_dir;
99 my $jobspec;
100 my $job_api_token;
101 my $no_clear_tmp;
102 my $resume_stash;
103 GetOptions('force-unlock' => \$force_unlock,
104            'git-dir=s' => \$git_dir,
105            'job=s' => \$jobspec,
106            'job-api-token=s' => \$job_api_token,
107            'no-clear-tmp' => \$no_clear_tmp,
108            'resume-stash=s' => \$resume_stash,
109     );
110
111 if (defined $job_api_token) {
112   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
113 }
114
115 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
116 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
117 my $local_job = !$job_has_uuid;
118
119
120 $SIG{'USR1'} = sub
121 {
122   $main::ENV{CRUNCH_DEBUG} = 1;
123 };
124 $SIG{'USR2'} = sub
125 {
126   $main::ENV{CRUNCH_DEBUG} = 0;
127 };
128
129
130
131 my $arv = Arvados->new('apiVersion' => 'v1');
132 my $metastream;
133
134 my $User = $arv->{'users'}->{'current'}->execute;
135
136 my $Job = {};
137 my $job_id;
138 my $dbh;
139 my $sth;
140 if ($job_has_uuid)
141 {
142   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
143   if (!$force_unlock) {
144     if ($Job->{'is_locked_by_uuid'}) {
145       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
146     }
147     if ($Job->{'success'} ne undef) {
148       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
149     }
150     if ($Job->{'running'}) {
151       croak("Job 'running' flag is already set");
152     }
153     if ($Job->{'started_at'}) {
154       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
155     }
156   }
157 }
158 else
159 {
160   $Job = JSON::decode_json($jobspec);
161
162   if (!$resume_stash)
163   {
164     map { croak ("No $_ specified") unless $Job->{$_} }
165     qw(script script_version script_parameters);
166   }
167
168   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
169   $Job->{'started_at'} = gmtime;
170
171   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
172
173   $job_has_uuid = 1;
174 }
175 $job_id = $Job->{'uuid'};
176
177 $metastream = Warehouse::Stream->new(whc => new Warehouse);
178 $metastream->clear;
179 $metastream->name('.');
180 $metastream->write_start($job_id . '.log.txt');
181
182
183 $Job->{'runtime_constraints'} ||= {};
184 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
185 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
186
187
188 Log (undef, "check slurm allocation");
189 my @slot;
190 my @node;
191 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
192 my @sinfo;
193 if (!$have_slurm)
194 {
195   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
196   push @sinfo, "$localcpus localhost";
197 }
198 if (exists $ENV{SLURM_NODELIST})
199 {
200   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
201 }
202 foreach (@sinfo)
203 {
204   my ($ncpus, $slurm_nodelist) = split;
205   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
206
207   my @nodelist;
208   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
209   {
210     my $nodelist = $1;
211     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
212     {
213       my $ranges = $1;
214       foreach (split (",", $ranges))
215       {
216         my ($a, $b);
217         if (/(\d+)-(\d+)/)
218         {
219           $a = $1;
220           $b = $2;
221         }
222         else
223         {
224           $a = $_;
225           $b = $_;
226         }
227         push @nodelist, map {
228           my $n = $nodelist;
229           $n =~ s/\[[-,\d]+\]/$_/;
230           $n;
231         } ($a..$b);
232       }
233     }
234     else
235     {
236       push @nodelist, $nodelist;
237     }
238   }
239   foreach my $nodename (@nodelist)
240   {
241     Log (undef, "node $nodename - $ncpus slots");
242     my $node = { name => $nodename,
243                  ncpus => $ncpus,
244                  losing_streak => 0,
245                  hold_until => 0 };
246     foreach my $cpu (1..$ncpus)
247     {
248       push @slot, { node => $node,
249                     cpu => $cpu };
250     }
251   }
252   push @node, @nodelist;
253 }
254
255
256
257 # Ensure that we get one jobstep running on each allocated node before
258 # we start overloading nodes with concurrent steps
259
260 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
261
262
263
264 my $jobmanager_id;
265 if ($job_has_uuid)
266 {
267   # Claim this job, and make sure nobody else does
268   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
269           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
270     croak("Error while updating / locking job");
271   }
272   $Job->update_attributes('started_at' => scalar gmtime,
273                           'running' => 1,
274                           'success' => undef,
275                           'tasks_summary' => { 'failed' => 0,
276                                                'todo' => 1,
277                                                'running' => 0,
278                                                'done' => 0 });
279 }
280
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
307 my $squeue_checked;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
311
312
313
314 if (defined $Job->{thawedfromkey})
315 {
316   thaw ($Job->{thawedfromkey});
317 }
318 else
319 {
320   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321     'job_uuid' => $Job->{'uuid'},
322     'sequence' => 0,
323     'qsequence' => 0,
324     'parameters' => {},
325                                                           });
326   push @jobstep, { 'level' => 0,
327                    'failures' => 0,
328                    'arvados_task' => $first_task,
329                  };
330   push @jobstep_todo, 0;
331 }
332
333
334 my $build_script;
335
336
337 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
338
339 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
340 if ($skip_install)
341 {
342   if (!defined $no_clear_tmp) {
343     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
344     system($clear_tmp_cmd) == 0
345         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
346   }
347   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
348   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
349     if (-d $src_path) {
350       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
351           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
352       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
353           == 0
354           or croak ("setup.py in $src_path failed: exit ".($?>>8));
355     }
356   }
357 }
358 else
359 {
360   do {
361     local $/ = undef;
362     $build_script = <DATA>;
363   };
364   Log (undef, "Install revision ".$Job->{script_version});
365   my $nodelist = join(",", @node);
366
367   if (!defined $no_clear_tmp) {
368     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
369
370     my $cleanpid = fork();
371     if ($cleanpid == 0)
372     {
373       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
374             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
375       exit (1);
376     }
377     while (1)
378     {
379       last if $cleanpid == waitpid (-1, WNOHANG);
380       freeze_if_want_freeze ($cleanpid);
381       select (undef, undef, undef, 0.1);
382     }
383     Log (undef, "Clean-work-dir exited $?");
384   }
385
386   # Install requested code version
387
388   my @execargs;
389   my @srunargs = ("srun",
390                   "--nodelist=$nodelist",
391                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
392
393   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
394   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
395
396   my $commit;
397   my $git_archive;
398   my $treeish = $Job->{'script_version'};
399   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
400   # Todo: let script_version specify repository instead of expecting
401   # parent process to figure it out.
402   $ENV{"CRUNCH_SRC_URL"} = $repo;
403
404   # Create/update our clone of the remote git repo
405
406   if (!-d $ENV{"CRUNCH_SRC"}) {
407     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
408         or croak ("git clone $repo failed: exit ".($?>>8));
409     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
410   }
411   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
412
413   # If this looks like a subversion r#, look for it in git-svn commit messages
414
415   if ($treeish =~ m{^\d{1,4}$}) {
416     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
417     chomp $gitlog;
418     if ($gitlog =~ /^[a-f0-9]{40}$/) {
419       $commit = $gitlog;
420       Log (undef, "Using commit $commit for script_version $treeish");
421     }
422   }
423
424   # If that didn't work, try asking git to look it up as a tree-ish.
425
426   if (!defined $commit) {
427
428     my $cooked_treeish = $treeish;
429     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
430       # Looks like a git branch name -- make sure git knows it's
431       # relative to the remote repo
432       $cooked_treeish = "origin/$treeish";
433     }
434
435     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
436     chomp $found;
437     if ($found =~ /^[0-9a-f]{40}$/s) {
438       $commit = $found;
439       if ($commit ne $treeish) {
440         # Make sure we record the real commit id in the database,
441         # frozentokey, logs, etc. -- instead of an abbreviation or a
442         # branch name which can become ambiguous or point to a
443         # different commit in the future.
444         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
445         Log (undef, "Using commit $commit for tree-ish $treeish");
446         if ($commit ne $treeish) {
447           $Job->{'script_version'} = $commit;
448           !$job_has_uuid or
449               $Job->update_attributes('script_version' => $commit) or
450               croak("Error while updating job");
451         }
452       }
453     }
454   }
455
456   if (defined $commit) {
457     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458     @execargs = ("sh", "-c",
459                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
460     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
461   }
462   else {
463     croak ("could not figure out commit id for $treeish");
464   }
465
466   my $installpid = fork();
467   if ($installpid == 0)
468   {
469     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
470     exit (1);
471   }
472   while (1)
473   {
474     last if $installpid == waitpid (-1, WNOHANG);
475     freeze_if_want_freeze ($installpid);
476     select (undef, undef, undef, 0.1);
477   }
478   Log (undef, "Install exited $?");
479 }
480
481
482
483 foreach (qw (script script_version script_parameters runtime_constraints))
484 {
485   Log (undef,
486        "$_ " .
487        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
488 }
489 foreach (split (/\n/, $Job->{knobs}))
490 {
491   Log (undef, "knob " . $_);
492 }
493
494
495
496 $main::success = undef;
497
498
499
500 ONELEVEL:
501
502 my $thisround_succeeded = 0;
503 my $thisround_failed = 0;
504 my $thisround_failed_multiple = 0;
505
506 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
507                        or $a <=> $b } @jobstep_todo;
508 my $level = $jobstep[$jobstep_todo[0]]->{level};
509 Log (undef, "start level $level");
510
511
512
513 my %proc;
514 my @freeslot = (0..$#slot);
515 my @holdslot;
516 my %reader;
517 my $progress_is_dirty = 1;
518 my $progress_stats_updated = 0;
519
520 update_progress_stats();
521
522
523
524 THISROUND:
525 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
526 {
527   my $id = $jobstep_todo[$todo_ptr];
528   my $Jobstep = $jobstep[$id];
529   if ($Jobstep->{level} != $level)
530   {
531     next;
532   }
533
534   pipe $reader{$id}, "writer" or croak ($!);
535   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
536   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
537
538   my $childslot = $freeslot[0];
539   my $childnode = $slot[$childslot]->{node};
540   my $childslotname = join (".",
541                             $slot[$childslot]->{node}->{name},
542                             $slot[$childslot]->{cpu});
543   my $childpid = fork();
544   if ($childpid == 0)
545   {
546     $SIG{'INT'} = 'DEFAULT';
547     $SIG{'QUIT'} = 'DEFAULT';
548     $SIG{'TERM'} = 'DEFAULT';
549
550     foreach (values (%reader))
551     {
552       close($_);
553     }
554     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
555     open(STDOUT,">&writer");
556     open(STDERR,">&writer");
557
558     undef $dbh;
559     undef $sth;
560
561     delete $ENV{"GNUPGHOME"};
562     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
563     $ENV{"TASK_QSEQUENCE"} = $id;
564     $ENV{"TASK_SEQUENCE"} = $level;
565     $ENV{"JOB_SCRIPT"} = $Job->{script};
566     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
567       $param =~ tr/a-z/A-Z/;
568       $ENV{"JOB_PARAMETER_$param"} = $value;
569     }
570     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
571     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
572     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
573     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
574     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
575     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
576     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
577
578     $ENV{"GZIP"} = "-n";
579
580     my @srunargs = (
581       "srun",
582       "--nodelist=".$childnode->{name},
583       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
584       "--job-name=$job_id.$id.$$",
585         );
586     my @execargs = qw(sh);
587     my $build_script_to_send = "";
588     my $command =
589         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
590         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
591         ."&& cd $ENV{CRUNCH_TMP} ";
592     if ($build_script)
593     {
594       $build_script_to_send = $build_script;
595       $command .=
596           "&& perl -";
597     }
598     $command .=
599         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
600     my @execargs = ('bash', '-c', $command);
601     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
602     exit (111);
603   }
604   close("writer");
605   if (!defined $childpid)
606   {
607     close $reader{$id};
608     delete $reader{$id};
609     next;
610   }
611   shift @freeslot;
612   $proc{$childpid} = { jobstep => $id,
613                        time => time,
614                        slot => $childslot,
615                        jobstepname => "$job_id.$id.$childpid",
616                      };
617   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
618   $slot[$childslot]->{pid} = $childpid;
619
620   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
621   Log ($id, "child $childpid started on $childslotname");
622   $Jobstep->{starttime} = time;
623   $Jobstep->{node} = $childnode->{name};
624   $Jobstep->{slotindex} = $childslot;
625   delete $Jobstep->{stderr};
626   delete $Jobstep->{finishtime};
627
628   splice @jobstep_todo, $todo_ptr, 1;
629   --$todo_ptr;
630
631   $progress_is_dirty = 1;
632
633   while (!@freeslot
634          ||
635          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
636   {
637     last THISROUND if $main::please_freeze;
638     if ($main::please_info)
639     {
640       $main::please_info = 0;
641       freeze();
642       collate_output();
643       save_meta(1);
644       update_progress_stats();
645     }
646     my $gotsome
647         = readfrompipes ()
648         + reapchildren ();
649     if (!$gotsome)
650     {
651       check_refresh_wanted();
652       check_squeue();
653       update_progress_stats();
654       select (undef, undef, undef, 0.1);
655     }
656     elsif (time - $progress_stats_updated >= 30)
657     {
658       update_progress_stats();
659     }
660     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
661         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
662     {
663       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
664           .($thisround_failed+$thisround_succeeded)
665           .") -- giving up on this round";
666       Log (undef, $message);
667       last THISROUND;
668     }
669
670     # move slots from freeslot to holdslot (or back to freeslot) if necessary
671     for (my $i=$#freeslot; $i>=0; $i--) {
672       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
673         push @holdslot, (splice @freeslot, $i, 1);
674       }
675     }
676     for (my $i=$#holdslot; $i>=0; $i--) {
677       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
678         push @freeslot, (splice @holdslot, $i, 1);
679       }
680     }
681
682     # give up if no nodes are succeeding
683     if (!grep { $_->{node}->{losing_streak} == 0 &&
684                     $_->{node}->{hold_count} < 4 } @slot) {
685       my $message = "Every node has failed -- giving up on this round";
686       Log (undef, $message);
687       last THISROUND;
688     }
689   }
690 }
691
692
693 push @freeslot, splice @holdslot;
694 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
695
696
697 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
698 while (%proc)
699 {
700   if ($main::please_continue) {
701     $main::please_continue = 0;
702     goto THISROUND;
703   }
704   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
705   readfrompipes ();
706   if (!reapchildren())
707   {
708     check_refresh_wanted();
709     check_squeue();
710     update_progress_stats();
711     select (undef, undef, undef, 0.1);
712     killem (keys %proc) if $main::please_freeze;
713   }
714 }
715
716 update_progress_stats();
717 freeze_if_want_freeze();
718
719
720 if (!defined $main::success)
721 {
722   if (@jobstep_todo &&
723       $thisround_succeeded == 0 &&
724       ($thisround_failed == 0 || $thisround_failed > 4))
725   {
726     my $message = "stop because $thisround_failed tasks failed and none succeeded";
727     Log (undef, $message);
728     $main::success = 0;
729   }
730   if (!@jobstep_todo)
731   {
732     $main::success = 1;
733   }
734 }
735
736 goto ONELEVEL if !defined $main::success;
737
738
739 release_allocation();
740 freeze();
741 if ($job_has_uuid) {
742   $Job->update_attributes('output' => &collate_output(),
743                           'running' => 0,
744                           'success' => $Job->{'output'} && $main::success,
745                           'finished_at' => scalar gmtime)
746 }
747
748 if ($Job->{'output'})
749 {
750   eval {
751     my $manifest_text = capturex("whget", $Job->{'output'});
752     $arv->{'collections'}->{'create'}->execute('collection' => {
753       'uuid' => $Job->{'output'},
754       'manifest_text' => $manifest_text,
755     });
756   };
757   if ($@) {
758     Log (undef, "Failed to register output manifest: $@");
759   }
760 }
761
762 Log (undef, "finish");
763
764 save_meta();
765 exit 0;
766
767
768
769 sub update_progress_stats
770 {
771   $progress_stats_updated = time;
772   return if !$progress_is_dirty;
773   my ($todo, $done, $running) = (scalar @jobstep_todo,
774                                  scalar @jobstep_done,
775                                  scalar @slot - scalar @freeslot - scalar @holdslot);
776   $Job->{'tasks_summary'} ||= {};
777   $Job->{'tasks_summary'}->{'todo'} = $todo;
778   $Job->{'tasks_summary'}->{'done'} = $done;
779   $Job->{'tasks_summary'}->{'running'} = $running;
780   if ($job_has_uuid) {
781     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
782   }
783   Log (undef, "status: $done done, $running running, $todo todo");
784   $progress_is_dirty = 0;
785 }
786
787
788
789 sub reapchildren
790 {
791   my $pid = waitpid (-1, WNOHANG);
792   return 0 if $pid <= 0;
793
794   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
795                   . "."
796                   . $slot[$proc{$pid}->{slot}]->{cpu});
797   my $jobstepid = $proc{$pid}->{jobstep};
798   my $elapsed = time - $proc{$pid}->{time};
799   my $Jobstep = $jobstep[$jobstepid];
800
801   my $childstatus = $?;
802   my $exitvalue = $childstatus >> 8;
803   my $exitinfo = sprintf("exit %d signal %d%s",
804                          $exitvalue,
805                          $childstatus & 127,
806                          ($childstatus & 128 ? ' core dump' : ''));
807   $Jobstep->{'arvados_task'}->reload;
808   my $task_success = $Jobstep->{'arvados_task'}->{success};
809
810   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
811
812   if (!defined $task_success) {
813     # task did not indicate one way or the other --> fail
814     $Jobstep->{'arvados_task'}->{success} = 0;
815     $Jobstep->{'arvados_task'}->save;
816     $task_success = 0;
817   }
818
819   if (!$task_success)
820   {
821     my $temporary_fail;
822     $temporary_fail ||= $Jobstep->{node_fail};
823     $temporary_fail ||= ($exitvalue == 111);
824
825     ++$thisround_failed;
826     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
827
828     # Check for signs of a failed or misconfigured node
829     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
830         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
831       # Don't count this against jobstep failure thresholds if this
832       # node is already suspected faulty and srun exited quickly
833       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
834           $elapsed < 5) {
835         Log ($jobstepid, "blaming failure on suspect node " .
836              $slot[$proc{$pid}->{slot}]->{node}->{name});
837         $temporary_fail ||= 1;
838       }
839       ban_node_by_slot($proc{$pid}->{slot});
840     }
841
842     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
843                              ++$Jobstep->{'failures'},
844                              $temporary_fail ? 'temporary ' : 'permanent',
845                              $elapsed));
846
847     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
848       # Give up on this task, and the whole job
849       $main::success = 0;
850       $main::please_freeze = 1;
851     }
852     else {
853       # Put this task back on the todo queue
854       push @jobstep_todo, $jobstepid;
855     }
856     $Job->{'tasks_summary'}->{'failed'}++;
857   }
858   else
859   {
860     ++$thisround_succeeded;
861     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
862     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
863     push @jobstep_done, $jobstepid;
864     Log ($jobstepid, "success in $elapsed seconds");
865   }
866   $Jobstep->{exitcode} = $childstatus;
867   $Jobstep->{finishtime} = time;
868   process_stderr ($jobstepid, $task_success);
869   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
870
871   close $reader{$jobstepid};
872   delete $reader{$jobstepid};
873   delete $slot[$proc{$pid}->{slot}]->{pid};
874   push @freeslot, $proc{$pid}->{slot};
875   delete $proc{$pid};
876
877   # Load new tasks
878   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
879     'where' => {
880       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
881     },
882     'order' => 'qsequence'
883   );
884   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
885     my $jobstep = {
886       'level' => $arvados_task->{'sequence'},
887       'failures' => 0,
888       'arvados_task' => $arvados_task
889     };
890     push @jobstep, $jobstep;
891     push @jobstep_todo, $#jobstep;
892   }
893
894   $progress_is_dirty = 1;
895   1;
896 }
897
898 sub check_refresh_wanted
899 {
900   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
901   if (@stat && $stat[9] > $latest_refresh) {
902     $latest_refresh = scalar time;
903     if ($job_has_uuid) {
904       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
905       for my $attr ('cancelled_at',
906                     'cancelled_by_user_uuid',
907                     'cancelled_by_client_uuid') {
908         $Job->{$attr} = $Job2->{$attr};
909       }
910       if ($Job->{'cancelled_at'}) {
911         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
912              " by user " . $Job->{cancelled_by_user_uuid});
913         $main::success = 0;
914         $main::please_freeze = 1;
915       }
916     }
917   }
918 }
919
920 sub check_squeue
921 {
922   # return if the kill list was checked <4 seconds ago
923   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
924   {
925     return;
926   }
927   $squeue_kill_checked = time;
928
929   # use killem() on procs whose killtime is reached
930   for (keys %proc)
931   {
932     if (exists $proc{$_}->{killtime}
933         && $proc{$_}->{killtime} <= time)
934     {
935       killem ($_);
936     }
937   }
938
939   # return if the squeue was checked <60 seconds ago
940   if (defined $squeue_checked && $squeue_checked > time - 60)
941   {
942     return;
943   }
944   $squeue_checked = time;
945
946   if (!$have_slurm)
947   {
948     # here is an opportunity to check for mysterious problems with local procs
949     return;
950   }
951
952   # get a list of steps still running
953   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
954   chop @squeue;
955   if ($squeue[-1] ne "ok")
956   {
957     return;
958   }
959   pop @squeue;
960
961   # which of my jobsteps are running, according to squeue?
962   my %ok;
963   foreach (@squeue)
964   {
965     if (/^(\d+)\.(\d+) (\S+)/)
966     {
967       if ($1 eq $ENV{SLURM_JOBID})
968       {
969         $ok{$3} = 1;
970       }
971     }
972   }
973
974   # which of my active child procs (>60s old) were not mentioned by squeue?
975   foreach (keys %proc)
976   {
977     if ($proc{$_}->{time} < time - 60
978         && !exists $ok{$proc{$_}->{jobstepname}}
979         && !exists $proc{$_}->{killtime})
980     {
981       # kill this proc if it hasn't exited in 30 seconds
982       $proc{$_}->{killtime} = time + 30;
983     }
984   }
985 }
986
987
988 sub release_allocation
989 {
990   if ($have_slurm)
991   {
992     Log (undef, "release job allocation");
993     system "scancel $ENV{SLURM_JOBID}";
994   }
995 }
996
997
998 sub readfrompipes
999 {
1000   my $gotsome = 0;
1001   foreach my $job (keys %reader)
1002   {
1003     my $buf;
1004     while (0 < sysread ($reader{$job}, $buf, 8192))
1005     {
1006       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1007       $jobstep[$job]->{stderr} .= $buf;
1008       preprocess_stderr ($job);
1009       if (length ($jobstep[$job]->{stderr}) > 16384)
1010       {
1011         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1012       }
1013       $gotsome = 1;
1014     }
1015   }
1016   return $gotsome;
1017 }
1018
1019
1020 sub preprocess_stderr
1021 {
1022   my $job = shift;
1023
1024   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1025     my $line = $1;
1026     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1027     Log ($job, "stderr $line");
1028     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1029       # whoa.
1030       $main::please_freeze = 1;
1031     }
1032     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1033       $jobstep[$job]->{node_fail} = 1;
1034       ban_node_by_slot($jobstep[$job]->{slotindex});
1035     }
1036   }
1037 }
1038
1039
1040 sub process_stderr
1041 {
1042   my $job = shift;
1043   my $task_success = shift;
1044   preprocess_stderr ($job);
1045
1046   map {
1047     Log ($job, "stderr $_");
1048   } split ("\n", $jobstep[$job]->{stderr});
1049 }
1050
1051
1052 sub collate_output
1053 {
1054   my $whc = Warehouse->new;
1055   Log (undef, "collate");
1056   $whc->write_start (1);
1057   my $joboutput;
1058   for (@jobstep)
1059   {
1060     next if (!exists $_->{'arvados_task'}->{output} ||
1061              !$_->{'arvados_task'}->{'success'} ||
1062              $_->{'exitcode'} != 0);
1063     my $output = $_->{'arvados_task'}->{output};
1064     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1065     {
1066       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1067       $whc->write_data ($output);
1068     }
1069     elsif (@jobstep == 1)
1070     {
1071       $joboutput = $output;
1072       $whc->write_finish;
1073     }
1074     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1075     {
1076       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1077       $whc->write_data ($outblock);
1078     }
1079     else
1080     {
1081       my $errstr = $whc->errstr;
1082       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1083       $main::success = 0;
1084     }
1085   }
1086   $joboutput = $whc->write_finish if !defined $joboutput;
1087   if ($joboutput)
1088   {
1089     Log (undef, "output $joboutput");
1090     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1091   }
1092   else
1093   {
1094     Log (undef, "output undef");
1095   }
1096   return $joboutput;
1097 }
1098
1099
1100 sub killem
1101 {
1102   foreach (@_)
1103   {
1104     my $sig = 2;                # SIGINT first
1105     if (exists $proc{$_}->{"sent_$sig"} &&
1106         time - $proc{$_}->{"sent_$sig"} > 4)
1107     {
1108       $sig = 15;                # SIGTERM if SIGINT doesn't work
1109     }
1110     if (exists $proc{$_}->{"sent_$sig"} &&
1111         time - $proc{$_}->{"sent_$sig"} > 4)
1112     {
1113       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1114     }
1115     if (!exists $proc{$_}->{"sent_$sig"})
1116     {
1117       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1118       kill $sig, $_;
1119       select (undef, undef, undef, 0.1);
1120       if ($sig == 2)
1121       {
1122         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1123       }
1124       $proc{$_}->{"sent_$sig"} = time;
1125       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1126     }
1127   }
1128 }
1129
1130
1131 sub fhbits
1132 {
1133   my($bits);
1134   for (@_) {
1135     vec($bits,fileno($_),1) = 1;
1136   }
1137   $bits;
1138 }
1139
1140
1141 sub Log                         # ($jobstep_id, $logmessage)
1142 {
1143   if ($_[1] =~ /\n/) {
1144     for my $line (split (/\n/, $_[1])) {
1145       Log ($_[0], $line);
1146     }
1147     return;
1148   }
1149   my $fh = select STDERR; $|=1; select $fh;
1150   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1151   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1152   $message .= "\n";
1153   my $datetime;
1154   if ($metastream || -t STDERR) {
1155     my @gmtime = gmtime;
1156     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1157                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1158   }
1159   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1160
1161   return if !$metastream;
1162   $metastream->write_data ($datetime . " " . $message);
1163 }
1164
1165
1166 sub croak
1167 {
1168   my ($package, $file, $line) = caller;
1169   my $message = "@_ at $file line $line\n";
1170   Log (undef, $message);
1171   freeze() if @jobstep_todo;
1172   collate_output() if @jobstep_todo;
1173   cleanup();
1174   save_meta() if $metastream;
1175   die;
1176 }
1177
1178
1179 sub cleanup
1180 {
1181   return if !$job_has_uuid;
1182   $Job->update_attributes('running' => 0,
1183                           'success' => 0,
1184                           'finished_at' => scalar gmtime);
1185 }
1186
1187
1188 sub save_meta
1189 {
1190   my $justcheckpoint = shift; # false if this will be the last meta saved
1191   my $m = $metastream;
1192   $m = $m->copy if $justcheckpoint;
1193   $m->write_finish;
1194   my $whc = Warehouse->new;
1195   my $loglocator = $whc->store_block ($m->as_string);
1196   $arv->{'collections'}->{'create'}->execute('collection' => {
1197     'uuid' => $loglocator,
1198     'manifest_text' => $m->as_string,
1199   });
1200   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1201   Log (undef, "log manifest is $loglocator");
1202   $Job->{'log'} = $loglocator;
1203   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1204 }
1205
1206
1207 sub freeze_if_want_freeze
1208 {
1209   if ($main::please_freeze)
1210   {
1211     release_allocation();
1212     if (@_)
1213     {
1214       # kill some srun procs before freeze+stop
1215       map { $proc{$_} = {} } @_;
1216       while (%proc)
1217       {
1218         killem (keys %proc);
1219         select (undef, undef, undef, 0.1);
1220         my $died;
1221         while (($died = waitpid (-1, WNOHANG)) > 0)
1222         {
1223           delete $proc{$died};
1224         }
1225       }
1226     }
1227     freeze();
1228     collate_output();
1229     cleanup();
1230     save_meta();
1231     exit 0;
1232   }
1233 }
1234
1235
1236 sub freeze
1237 {
1238   Log (undef, "Freeze not implemented");
1239   return;
1240 }
1241
1242
1243 sub thaw
1244 {
1245   croak ("Thaw not implemented");
1246
1247   my $whc;
1248   my $key = shift;
1249   Log (undef, "thaw from $key");
1250
1251   @jobstep = ();
1252   @jobstep_done = ();
1253   @jobstep_todo = ();
1254   @jobstep_tomerge = ();
1255   $jobstep_tomerge_level = 0;
1256   my $frozenjob = {};
1257
1258   my $stream = new Warehouse::Stream ( whc => $whc,
1259                                        hash => [split (",", $key)] );
1260   $stream->rewind;
1261   while (my $dataref = $stream->read_until (undef, "\n\n"))
1262   {
1263     if ($$dataref =~ /^job /)
1264     {
1265       foreach (split ("\n", $$dataref))
1266       {
1267         my ($k, $v) = split ("=", $_, 2);
1268         $frozenjob->{$k} = freezeunquote ($v);
1269       }
1270       next;
1271     }
1272
1273     if ($$dataref =~ /^merge (\d+) (.*)/)
1274     {
1275       $jobstep_tomerge_level = $1;
1276       @jobstep_tomerge
1277           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1278       next;
1279     }
1280
1281     my $Jobstep = { };
1282     foreach (split ("\n", $$dataref))
1283     {
1284       my ($k, $v) = split ("=", $_, 2);
1285       $Jobstep->{$k} = freezeunquote ($v) if $k;
1286     }
1287     $Jobstep->{'failures'} = 0;
1288     push @jobstep, $Jobstep;
1289
1290     if ($Jobstep->{exitcode} eq "0")
1291     {
1292       push @jobstep_done, $#jobstep;
1293     }
1294     else
1295     {
1296       push @jobstep_todo, $#jobstep;
1297     }
1298   }
1299
1300   foreach (qw (script script_version script_parameters))
1301   {
1302     $Job->{$_} = $frozenjob->{$_};
1303   }
1304   $Job->save if $job_has_uuid;
1305 }
1306
1307
1308 sub freezequote
1309 {
1310   my $s = shift;
1311   $s =~ s/\\/\\\\/g;
1312   $s =~ s/\n/\\n/g;
1313   return $s;
1314 }
1315
1316
1317 sub freezeunquote
1318 {
1319   my $s = shift;
1320   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1321   return $s;
1322 }
1323
1324
1325 sub srun
1326 {
1327   my $srunargs = shift;
1328   my $execargs = shift;
1329   my $opts = shift || {};
1330   my $stdin = shift;
1331   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1332   print STDERR (join (" ",
1333                       map { / / ? "'$_'" : $_ }
1334                       (@$args)),
1335                 "\n")
1336       if $ENV{CRUNCH_DEBUG};
1337
1338   if (defined $stdin) {
1339     my $child = open STDIN, "-|";
1340     defined $child or die "no fork: $!";
1341     if ($child == 0) {
1342       print $stdin or die $!;
1343       close STDOUT or die $!;
1344       exit 0;
1345     }
1346   }
1347
1348   return system (@$args) if $opts->{fork};
1349
1350   exec @$args;
1351   warn "ENV size is ".length(join(" ",%ENV));
1352   die "exec failed: $!: @$args";
1353 }
1354
1355
1356 sub ban_node_by_slot {
1357   # Don't start any new jobsteps on this node for 60 seconds
1358   my $slotid = shift;
1359   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1360   $slot[$slotid]->{node}->{hold_count}++;
1361   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1362 }
1363
1364 __DATA__
1365 #!/usr/bin/perl
1366
1367 # checkout-and-build
1368
1369 use Fcntl ':flock';
1370
1371 my $destdir = $ENV{"CRUNCH_SRC"};
1372 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1373 my $repo = $ENV{"CRUNCH_SRC_URL"};
1374
1375 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1376 flock L, LOCK_EX;
1377 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1378     exit 0;
1379 }
1380
1381 unlink "$destdir.commit";
1382 open STDOUT, ">", "$destdir.log";
1383 open STDERR, ">&STDOUT";
1384
1385 mkdir $destdir;
1386 my @git_archive_data = <DATA>;
1387 if (@git_archive_data) {
1388   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1389   print TARX @git_archive_data;
1390   if(!close(TARX)) {
1391     die "'tar -C $destdir -xf -' exited $?: $!";
1392   }
1393 }
1394
1395 my $pwd;
1396 chomp ($pwd = `pwd`);
1397 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1398 mkdir $install_dir;
1399
1400 for my $src_path ("$destdir/arvados/sdk/python") {
1401   if (-d $src_path) {
1402     shell_or_die ("virtualenv", $install_dir);
1403     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1404   }
1405 }
1406
1407 if (-e "$destdir/crunch_scripts/install") {
1408     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1409 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1410     # Old version
1411     shell_or_die ("./tests/autotests.sh", $install_dir);
1412 } elsif (-e "./install.sh") {
1413     shell_or_die ("./install.sh", $install_dir);
1414 }
1415
1416 if ($commit) {
1417     unlink "$destdir.commit.new";
1418     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1419     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1420 }
1421
1422 close L;
1423
1424 exit 0;
1425
1426 sub shell_or_die
1427 {
1428   if ($ENV{"DEBUG"}) {
1429     print STDERR "@_\n";
1430   }
1431   system (@_) == 0
1432       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1433 }
1434
1435 __DATA__