Merge branch '2449-keep-write-blocks' into 2449-keep-index-status-handlers
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
404   # Todo: let script_version specify repository instead of expecting
405   # parent process to figure it out.
406   $ENV{"CRUNCH_SRC_URL"} = $repo;
407
408   # Create/update our clone of the remote git repo
409
410   if (!-d $ENV{"CRUNCH_SRC"}) {
411     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
412         or croak ("git clone $repo failed: exit ".($?>>8));
413     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
414   }
415   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
416
417   # If this looks like a subversion r#, look for it in git-svn commit messages
418
419   if ($treeish =~ m{^\d{1,4}$}) {
420     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
421     chomp $gitlog;
422     if ($gitlog =~ /^[a-f0-9]{40}$/) {
423       $commit = $gitlog;
424       Log (undef, "Using commit $commit for script_version $treeish");
425     }
426   }
427
428   # If that didn't work, try asking git to look it up as a tree-ish.
429
430   if (!defined $commit) {
431
432     my $cooked_treeish = $treeish;
433     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
434       # Looks like a git branch name -- make sure git knows it's
435       # relative to the remote repo
436       $cooked_treeish = "origin/$treeish";
437     }
438
439     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
440     chomp $found;
441     if ($found =~ /^[0-9a-f]{40}$/s) {
442       $commit = $found;
443       if ($commit ne $treeish) {
444         # Make sure we record the real commit id in the database,
445         # frozentokey, logs, etc. -- instead of an abbreviation or a
446         # branch name which can become ambiguous or point to a
447         # different commit in the future.
448         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
449         Log (undef, "Using commit $commit for tree-ish $treeish");
450         if ($commit ne $treeish) {
451           $Job->{'script_version'} = $commit;
452           !$job_has_uuid or
453               $Job->update_attributes('script_version' => $commit) or
454               croak("Error while updating job");
455         }
456       }
457     }
458   }
459
460   if (defined $commit) {
461     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
462     @execargs = ("sh", "-c",
463                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
464     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
465   }
466   else {
467     croak ("could not figure out commit id for $treeish");
468   }
469
470   my $installpid = fork();
471   if ($installpid == 0)
472   {
473     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
474     exit (1);
475   }
476   while (1)
477   {
478     last if $installpid == waitpid (-1, WNOHANG);
479     freeze_if_want_freeze ($installpid);
480     select (undef, undef, undef, 0.1);
481   }
482   Log (undef, "Install exited $?");
483 }
484
485 if (!$have_slurm)
486 {
487   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
488   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
489 }
490
491
492
493 foreach (qw (script script_version script_parameters runtime_constraints))
494 {
495   Log (undef,
496        "$_ " .
497        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
498 }
499 foreach (split (/\n/, $Job->{knobs}))
500 {
501   Log (undef, "knob " . $_);
502 }
503
504
505
506 $main::success = undef;
507
508
509
510 ONELEVEL:
511
512 my $thisround_succeeded = 0;
513 my $thisround_failed = 0;
514 my $thisround_failed_multiple = 0;
515
516 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
517                        or $a <=> $b } @jobstep_todo;
518 my $level = $jobstep[$jobstep_todo[0]]->{level};
519 Log (undef, "start level $level");
520
521
522
523 my %proc;
524 my @freeslot = (0..$#slot);
525 my @holdslot;
526 my %reader;
527 my $progress_is_dirty = 1;
528 my $progress_stats_updated = 0;
529
530 update_progress_stats();
531
532
533
534 THISROUND:
535 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
536 {
537   my $id = $jobstep_todo[$todo_ptr];
538   my $Jobstep = $jobstep[$id];
539   if ($Jobstep->{level} != $level)
540   {
541     next;
542   }
543
544   pipe $reader{$id}, "writer" or croak ($!);
545   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
546   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
547
548   my $childslot = $freeslot[0];
549   my $childnode = $slot[$childslot]->{node};
550   my $childslotname = join (".",
551                             $slot[$childslot]->{node}->{name},
552                             $slot[$childslot]->{cpu});
553   my $childpid = fork();
554   if ($childpid == 0)
555   {
556     $SIG{'INT'} = 'DEFAULT';
557     $SIG{'QUIT'} = 'DEFAULT';
558     $SIG{'TERM'} = 'DEFAULT';
559
560     foreach (values (%reader))
561     {
562       close($_);
563     }
564     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
565     open(STDOUT,">&writer");
566     open(STDERR,">&writer");
567
568     undef $dbh;
569     undef $sth;
570
571     delete $ENV{"GNUPGHOME"};
572     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
573     $ENV{"TASK_QSEQUENCE"} = $id;
574     $ENV{"TASK_SEQUENCE"} = $level;
575     $ENV{"JOB_SCRIPT"} = $Job->{script};
576     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
577       $param =~ tr/a-z/A-Z/;
578       $ENV{"JOB_PARAMETER_$param"} = $value;
579     }
580     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
581     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
582     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
583     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
584     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
585     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
586     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
587
588     $ENV{"GZIP"} = "-n";
589
590     my @srunargs = (
591       "srun",
592       "--nodelist=".$childnode->{name},
593       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
594       "--job-name=$job_id.$id.$$",
595         );
596     my @execargs = qw(sh);
597     my $build_script_to_send = "";
598     my $command =
599         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
600         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
601         ."&& cd $ENV{CRUNCH_TMP} ";
602     if ($build_script)
603     {
604       $build_script_to_send = $build_script;
605       $command .=
606           "&& perl -";
607     }
608     $command .=
609         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
610     my @execargs = ('bash', '-c', $command);
611     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
612     exit (111);
613   }
614   close("writer");
615   if (!defined $childpid)
616   {
617     close $reader{$id};
618     delete $reader{$id};
619     next;
620   }
621   shift @freeslot;
622   $proc{$childpid} = { jobstep => $id,
623                        time => time,
624                        slot => $childslot,
625                        jobstepname => "$job_id.$id.$childpid",
626                      };
627   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
628   $slot[$childslot]->{pid} = $childpid;
629
630   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
631   Log ($id, "child $childpid started on $childslotname");
632   $Jobstep->{starttime} = time;
633   $Jobstep->{node} = $childnode->{name};
634   $Jobstep->{slotindex} = $childslot;
635   delete $Jobstep->{stderr};
636   delete $Jobstep->{finishtime};
637
638   splice @jobstep_todo, $todo_ptr, 1;
639   --$todo_ptr;
640
641   $progress_is_dirty = 1;
642
643   while (!@freeslot
644          ||
645          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
646   {
647     last THISROUND if $main::please_freeze;
648     if ($main::please_info)
649     {
650       $main::please_info = 0;
651       freeze();
652       collate_output();
653       save_meta(1);
654       update_progress_stats();
655     }
656     my $gotsome
657         = readfrompipes ()
658         + reapchildren ();
659     if (!$gotsome)
660     {
661       check_refresh_wanted();
662       check_squeue();
663       update_progress_stats();
664       select (undef, undef, undef, 0.1);
665     }
666     elsif (time - $progress_stats_updated >= 30)
667     {
668       update_progress_stats();
669     }
670     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
671         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
672     {
673       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
674           .($thisround_failed+$thisround_succeeded)
675           .") -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679
680     # move slots from freeslot to holdslot (or back to freeslot) if necessary
681     for (my $i=$#freeslot; $i>=0; $i--) {
682       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
683         push @holdslot, (splice @freeslot, $i, 1);
684       }
685     }
686     for (my $i=$#holdslot; $i>=0; $i--) {
687       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
688         push @freeslot, (splice @holdslot, $i, 1);
689       }
690     }
691
692     # give up if no nodes are succeeding
693     if (!grep { $_->{node}->{losing_streak} == 0 &&
694                     $_->{node}->{hold_count} < 4 } @slot) {
695       my $message = "Every node has failed -- giving up on this round";
696       Log (undef, $message);
697       last THISROUND;
698     }
699   }
700 }
701
702
703 push @freeslot, splice @holdslot;
704 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
705
706
707 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
708 while (%proc)
709 {
710   if ($main::please_continue) {
711     $main::please_continue = 0;
712     goto THISROUND;
713   }
714   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
715   readfrompipes ();
716   if (!reapchildren())
717   {
718     check_refresh_wanted();
719     check_squeue();
720     update_progress_stats();
721     select (undef, undef, undef, 0.1);
722     killem (keys %proc) if $main::please_freeze;
723   }
724 }
725
726 update_progress_stats();
727 freeze_if_want_freeze();
728
729
730 if (!defined $main::success)
731 {
732   if (@jobstep_todo &&
733       $thisround_succeeded == 0 &&
734       ($thisround_failed == 0 || $thisround_failed > 4))
735   {
736     my $message = "stop because $thisround_failed tasks failed and none succeeded";
737     Log (undef, $message);
738     $main::success = 0;
739   }
740   if (!@jobstep_todo)
741   {
742     $main::success = 1;
743   }
744 }
745
746 goto ONELEVEL if !defined $main::success;
747
748
749 release_allocation();
750 freeze();
751 if ($job_has_uuid) {
752   $Job->update_attributes('output' => &collate_output(),
753                           'running' => 0,
754                           'success' => $Job->{'output'} && $main::success,
755                           'finished_at' => scalar gmtime)
756 }
757
758 if ($Job->{'output'})
759 {
760   eval {
761     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
762     $arv->{'collections'}->{'create'}->execute('collection' => {
763       'uuid' => $Job->{'output'},
764       'manifest_text' => $manifest_text,
765     });
766     if ($Job->{'output_is_persistent'}) {
767       $arv->{'links'}->{'create'}->execute('link' => {
768         'tail_kind' => 'arvados#user',
769         'tail_uuid' => $User->{'uuid'},
770         'head_kind' => 'arvados#collection',
771         'head_uuid' => $Job->{'output'},
772         'link_class' => 'resources',
773         'name' => 'wants',
774       });
775     }
776   };
777   if ($@) {
778     Log (undef, "Failed to register output manifest: $@");
779   }
780 }
781
782 Log (undef, "finish");
783
784 save_meta();
785 exit 0;
786
787
788
789 sub update_progress_stats
790 {
791   $progress_stats_updated = time;
792   return if !$progress_is_dirty;
793   my ($todo, $done, $running) = (scalar @jobstep_todo,
794                                  scalar @jobstep_done,
795                                  scalar @slot - scalar @freeslot - scalar @holdslot);
796   $Job->{'tasks_summary'} ||= {};
797   $Job->{'tasks_summary'}->{'todo'} = $todo;
798   $Job->{'tasks_summary'}->{'done'} = $done;
799   $Job->{'tasks_summary'}->{'running'} = $running;
800   if ($job_has_uuid) {
801     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
802   }
803   Log (undef, "status: $done done, $running running, $todo todo");
804   $progress_is_dirty = 0;
805 }
806
807
808
809 sub reapchildren
810 {
811   my $pid = waitpid (-1, WNOHANG);
812   return 0 if $pid <= 0;
813
814   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
815                   . "."
816                   . $slot[$proc{$pid}->{slot}]->{cpu});
817   my $jobstepid = $proc{$pid}->{jobstep};
818   my $elapsed = time - $proc{$pid}->{time};
819   my $Jobstep = $jobstep[$jobstepid];
820
821   my $childstatus = $?;
822   my $exitvalue = $childstatus >> 8;
823   my $exitinfo = sprintf("exit %d signal %d%s",
824                          $exitvalue,
825                          $childstatus & 127,
826                          ($childstatus & 128 ? ' core dump' : ''));
827   $Jobstep->{'arvados_task'}->reload;
828   my $task_success = $Jobstep->{'arvados_task'}->{success};
829
830   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
831
832   if (!defined $task_success) {
833     # task did not indicate one way or the other --> fail
834     $Jobstep->{'arvados_task'}->{success} = 0;
835     $Jobstep->{'arvados_task'}->save;
836     $task_success = 0;
837   }
838
839   if (!$task_success)
840   {
841     my $temporary_fail;
842     $temporary_fail ||= $Jobstep->{node_fail};
843     $temporary_fail ||= ($exitvalue == 111);
844
845     ++$thisround_failed;
846     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
847
848     # Check for signs of a failed or misconfigured node
849     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
850         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
851       # Don't count this against jobstep failure thresholds if this
852       # node is already suspected faulty and srun exited quickly
853       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
854           $elapsed < 5) {
855         Log ($jobstepid, "blaming failure on suspect node " .
856              $slot[$proc{$pid}->{slot}]->{node}->{name});
857         $temporary_fail ||= 1;
858       }
859       ban_node_by_slot($proc{$pid}->{slot});
860     }
861
862     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
863                              ++$Jobstep->{'failures'},
864                              $temporary_fail ? 'temporary ' : 'permanent',
865                              $elapsed));
866
867     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
868       # Give up on this task, and the whole job
869       $main::success = 0;
870       $main::please_freeze = 1;
871     }
872     else {
873       # Put this task back on the todo queue
874       push @jobstep_todo, $jobstepid;
875     }
876     $Job->{'tasks_summary'}->{'failed'}++;
877   }
878   else
879   {
880     ++$thisround_succeeded;
881     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
882     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
883     push @jobstep_done, $jobstepid;
884     Log ($jobstepid, "success in $elapsed seconds");
885   }
886   $Jobstep->{exitcode} = $childstatus;
887   $Jobstep->{finishtime} = time;
888   process_stderr ($jobstepid, $task_success);
889   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
890
891   close $reader{$jobstepid};
892   delete $reader{$jobstepid};
893   delete $slot[$proc{$pid}->{slot}]->{pid};
894   push @freeslot, $proc{$pid}->{slot};
895   delete $proc{$pid};
896
897   # Load new tasks
898   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
899     'where' => {
900       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
901     },
902     'order' => 'qsequence'
903   );
904   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
905     my $jobstep = {
906       'level' => $arvados_task->{'sequence'},
907       'failures' => 0,
908       'arvados_task' => $arvados_task
909     };
910     push @jobstep, $jobstep;
911     push @jobstep_todo, $#jobstep;
912   }
913
914   $progress_is_dirty = 1;
915   1;
916 }
917
918 sub check_refresh_wanted
919 {
920   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
921   if (@stat && $stat[9] > $latest_refresh) {
922     $latest_refresh = scalar time;
923     if ($job_has_uuid) {
924       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
925       for my $attr ('cancelled_at',
926                     'cancelled_by_user_uuid',
927                     'cancelled_by_client_uuid') {
928         $Job->{$attr} = $Job2->{$attr};
929       }
930       if ($Job->{'cancelled_at'}) {
931         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
932              " by user " . $Job->{cancelled_by_user_uuid});
933         $main::success = 0;
934         $main::please_freeze = 1;
935       }
936     }
937   }
938 }
939
940 sub check_squeue
941 {
942   # return if the kill list was checked <4 seconds ago
943   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
944   {
945     return;
946   }
947   $squeue_kill_checked = time;
948
949   # use killem() on procs whose killtime is reached
950   for (keys %proc)
951   {
952     if (exists $proc{$_}->{killtime}
953         && $proc{$_}->{killtime} <= time)
954     {
955       killem ($_);
956     }
957   }
958
959   # return if the squeue was checked <60 seconds ago
960   if (defined $squeue_checked && $squeue_checked > time - 60)
961   {
962     return;
963   }
964   $squeue_checked = time;
965
966   if (!$have_slurm)
967   {
968     # here is an opportunity to check for mysterious problems with local procs
969     return;
970   }
971
972   # get a list of steps still running
973   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
974   chop @squeue;
975   if ($squeue[-1] ne "ok")
976   {
977     return;
978   }
979   pop @squeue;
980
981   # which of my jobsteps are running, according to squeue?
982   my %ok;
983   foreach (@squeue)
984   {
985     if (/^(\d+)\.(\d+) (\S+)/)
986     {
987       if ($1 eq $ENV{SLURM_JOBID})
988       {
989         $ok{$3} = 1;
990       }
991     }
992   }
993
994   # which of my active child procs (>60s old) were not mentioned by squeue?
995   foreach (keys %proc)
996   {
997     if ($proc{$_}->{time} < time - 60
998         && !exists $ok{$proc{$_}->{jobstepname}}
999         && !exists $proc{$_}->{killtime})
1000     {
1001       # kill this proc if it hasn't exited in 30 seconds
1002       $proc{$_}->{killtime} = time + 30;
1003     }
1004   }
1005 }
1006
1007
1008 sub release_allocation
1009 {
1010   if ($have_slurm)
1011   {
1012     Log (undef, "release job allocation");
1013     system "scancel $ENV{SLURM_JOBID}";
1014   }
1015 }
1016
1017
1018 sub readfrompipes
1019 {
1020   my $gotsome = 0;
1021   foreach my $job (keys %reader)
1022   {
1023     my $buf;
1024     while (0 < sysread ($reader{$job}, $buf, 8192))
1025     {
1026       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1027       $jobstep[$job]->{stderr} .= $buf;
1028       preprocess_stderr ($job);
1029       if (length ($jobstep[$job]->{stderr}) > 16384)
1030       {
1031         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1032       }
1033       $gotsome = 1;
1034     }
1035   }
1036   return $gotsome;
1037 }
1038
1039
1040 sub preprocess_stderr
1041 {
1042   my $job = shift;
1043
1044   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1045     my $line = $1;
1046     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1047     Log ($job, "stderr $line");
1048     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1049       # whoa.
1050       $main::please_freeze = 1;
1051     }
1052     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1053       $jobstep[$job]->{node_fail} = 1;
1054       ban_node_by_slot($jobstep[$job]->{slotindex});
1055     }
1056   }
1057 }
1058
1059
1060 sub process_stderr
1061 {
1062   my $job = shift;
1063   my $task_success = shift;
1064   preprocess_stderr ($job);
1065
1066   map {
1067     Log ($job, "stderr $_");
1068   } split ("\n", $jobstep[$job]->{stderr});
1069 }
1070
1071 sub fetch_block
1072 {
1073   my $hash = shift;
1074   my ($keep, $child_out, $output_block);
1075
1076   my $cmd = "arv keep get \Q$hash\E";
1077   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1078   sysread($keep, $output_block, 64 * 1024 * 1024);
1079   close $keep;
1080   return $output_block;
1081 }
1082
1083 sub collate_output
1084 {
1085   Log (undef, "collate");
1086
1087   my ($child_out, $child_in);
1088   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1089   my $joboutput;
1090   for (@jobstep)
1091   {
1092     next if (!exists $_->{'arvados_task'}->{output} ||
1093              !$_->{'arvados_task'}->{'success'} ||
1094              $_->{'exitcode'} != 0);
1095     my $output = $_->{'arvados_task'}->{output};
1096     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1097     {
1098       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1099       print $child_in $output;
1100     }
1101     elsif (@jobstep == 1)
1102     {
1103       $joboutput = $output;
1104       last;
1105     }
1106     elsif (defined (my $outblock = fetch_block ($output)))
1107     {
1108       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1109       print $child_in $outblock;
1110     }
1111     else
1112     {
1113       Log (undef, "XXX fetch_block($output) failed XXX");
1114       $main::success = 0;
1115     }
1116   }
1117   $child_in->close;
1118
1119   if (!defined $joboutput) {
1120     my $s = IO::Select->new($child_out);
1121     if ($s->can_read(120)) {
1122       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1123       chomp($joboutput);
1124     } else {
1125       Log (undef, "timed out reading from 'arv keep put'");
1126     }
1127   }
1128   waitpid($pid, 0);
1129
1130   if ($joboutput)
1131   {
1132     Log (undef, "output $joboutput");
1133     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1134   }
1135   else
1136   {
1137     Log (undef, "output undef");
1138   }
1139   return $joboutput;
1140 }
1141
1142
1143 sub killem
1144 {
1145   foreach (@_)
1146   {
1147     my $sig = 2;                # SIGINT first
1148     if (exists $proc{$_}->{"sent_$sig"} &&
1149         time - $proc{$_}->{"sent_$sig"} > 4)
1150     {
1151       $sig = 15;                # SIGTERM if SIGINT doesn't work
1152     }
1153     if (exists $proc{$_}->{"sent_$sig"} &&
1154         time - $proc{$_}->{"sent_$sig"} > 4)
1155     {
1156       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1157     }
1158     if (!exists $proc{$_}->{"sent_$sig"})
1159     {
1160       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1161       kill $sig, $_;
1162       select (undef, undef, undef, 0.1);
1163       if ($sig == 2)
1164       {
1165         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1166       }
1167       $proc{$_}->{"sent_$sig"} = time;
1168       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1169     }
1170   }
1171 }
1172
1173
1174 sub fhbits
1175 {
1176   my($bits);
1177   for (@_) {
1178     vec($bits,fileno($_),1) = 1;
1179   }
1180   $bits;
1181 }
1182
1183
1184 sub Log                         # ($jobstep_id, $logmessage)
1185 {
1186   if ($_[1] =~ /\n/) {
1187     for my $line (split (/\n/, $_[1])) {
1188       Log ($_[0], $line);
1189     }
1190     return;
1191   }
1192   my $fh = select STDERR; $|=1; select $fh;
1193   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1194   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1195   $message .= "\n";
1196   my $datetime;
1197   if ($metastream || -t STDERR) {
1198     my @gmtime = gmtime;
1199     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1200                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1201   }
1202   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1203
1204   if ($metastream) {
1205     print $metastream $datetime . " " . $message;
1206   }
1207 }
1208
1209
1210 sub croak
1211 {
1212   my ($package, $file, $line) = caller;
1213   my $message = "@_ at $file line $line\n";
1214   Log (undef, $message);
1215   freeze() if @jobstep_todo;
1216   collate_output() if @jobstep_todo;
1217   cleanup();
1218   save_meta() if $metastream;
1219   die;
1220 }
1221
1222
1223 sub cleanup
1224 {
1225   return if !$job_has_uuid;
1226   $Job->update_attributes('running' => 0,
1227                           'success' => 0,
1228                           'finished_at' => scalar gmtime);
1229 }
1230
1231
1232 sub save_meta
1233 {
1234   my $justcheckpoint = shift; # false if this will be the last meta saved
1235   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1236
1237   $local_logfile->flush;
1238   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1239       . quotemeta($local_logfile->filename);
1240   my $loglocator = `$cmd`;
1241   die "system $cmd failed: $?" if $?;
1242
1243   $local_logfile = undef;   # the temp file is automatically deleted
1244   Log (undef, "log manifest is $loglocator");
1245   $Job->{'log'} = $loglocator;
1246   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1247 }
1248
1249
1250 sub freeze_if_want_freeze
1251 {
1252   if ($main::please_freeze)
1253   {
1254     release_allocation();
1255     if (@_)
1256     {
1257       # kill some srun procs before freeze+stop
1258       map { $proc{$_} = {} } @_;
1259       while (%proc)
1260       {
1261         killem (keys %proc);
1262         select (undef, undef, undef, 0.1);
1263         my $died;
1264         while (($died = waitpid (-1, WNOHANG)) > 0)
1265         {
1266           delete $proc{$died};
1267         }
1268       }
1269     }
1270     freeze();
1271     collate_output();
1272     cleanup();
1273     save_meta();
1274     exit 0;
1275   }
1276 }
1277
1278
1279 sub freeze
1280 {
1281   Log (undef, "Freeze not implemented");
1282   return;
1283 }
1284
1285
1286 sub thaw
1287 {
1288   croak ("Thaw not implemented");
1289 }
1290
1291
1292 sub freezequote
1293 {
1294   my $s = shift;
1295   $s =~ s/\\/\\\\/g;
1296   $s =~ s/\n/\\n/g;
1297   return $s;
1298 }
1299
1300
1301 sub freezeunquote
1302 {
1303   my $s = shift;
1304   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1305   return $s;
1306 }
1307
1308
1309 sub srun
1310 {
1311   my $srunargs = shift;
1312   my $execargs = shift;
1313   my $opts = shift || {};
1314   my $stdin = shift;
1315   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1316   print STDERR (join (" ",
1317                       map { / / ? "'$_'" : $_ }
1318                       (@$args)),
1319                 "\n")
1320       if $ENV{CRUNCH_DEBUG};
1321
1322   if (defined $stdin) {
1323     my $child = open STDIN, "-|";
1324     defined $child or die "no fork: $!";
1325     if ($child == 0) {
1326       print $stdin or die $!;
1327       close STDOUT or die $!;
1328       exit 0;
1329     }
1330   }
1331
1332   return system (@$args) if $opts->{fork};
1333
1334   exec @$args;
1335   warn "ENV size is ".length(join(" ",%ENV));
1336   die "exec failed: $!: @$args";
1337 }
1338
1339
1340 sub ban_node_by_slot {
1341   # Don't start any new jobsteps on this node for 60 seconds
1342   my $slotid = shift;
1343   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1344   $slot[$slotid]->{node}->{hold_count}++;
1345   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1346 }
1347
1348 sub must_lock_now
1349 {
1350   my ($lockfile, $error_message) = @_;
1351   open L, ">", $lockfile or croak("$lockfile: $!");
1352   if (!flock L, LOCK_EX|LOCK_NB) {
1353     croak("Can't lock $lockfile: $error_message\n");
1354   }
1355 }
1356
1357 __DATA__
1358 #!/usr/bin/perl
1359
1360 # checkout-and-build
1361
1362 use Fcntl ':flock';
1363
1364 my $destdir = $ENV{"CRUNCH_SRC"};
1365 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1366 my $repo = $ENV{"CRUNCH_SRC_URL"};
1367
1368 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1369 flock L, LOCK_EX;
1370 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1371     exit 0;
1372 }
1373
1374 unlink "$destdir.commit";
1375 open STDOUT, ">", "$destdir.log";
1376 open STDERR, ">&STDOUT";
1377
1378 mkdir $destdir;
1379 my @git_archive_data = <DATA>;
1380 if (@git_archive_data) {
1381   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1382   print TARX @git_archive_data;
1383   if(!close(TARX)) {
1384     die "'tar -C $destdir -xf -' exited $?: $!";
1385   }
1386 }
1387
1388 my $pwd;
1389 chomp ($pwd = `pwd`);
1390 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1391 mkdir $install_dir;
1392
1393 for my $src_path ("$destdir/arvados/sdk/python") {
1394   if (-d $src_path) {
1395     shell_or_die ("virtualenv", $install_dir);
1396     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1397   }
1398 }
1399
1400 if (-e "$destdir/crunch_scripts/install") {
1401     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1402 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1403     # Old version
1404     shell_or_die ("./tests/autotests.sh", $install_dir);
1405 } elsif (-e "./install.sh") {
1406     shell_or_die ("./install.sh", $install_dir);
1407 }
1408
1409 if ($commit) {
1410     unlink "$destdir.commit.new";
1411     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1412     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1413 }
1414
1415 close L;
1416
1417 exit 0;
1418
1419 sub shell_or_die
1420 {
1421   if ($ENV{"DEBUG"}) {
1422     print STDERR "@_\n";
1423   }
1424   system (@_) == 0
1425       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1426 }
1427
1428 __DATA__