Merge branch '2221-complete-docker' (closes #2325, closes #2221)
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
404   # Todo: let script_version specify repository instead of expecting
405   # parent process to figure it out.
406   $ENV{"CRUNCH_SRC_URL"} = $repo;
407
408   # Create/update our clone of the remote git repo
409
410   if (!-d $ENV{"CRUNCH_SRC"}) {
411     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
412         or croak ("git clone $repo failed: exit ".($?>>8));
413     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
414   }
415   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
416
417   # If this looks like a subversion r#, look for it in git-svn commit messages
418
419   if ($treeish =~ m{^\d{1,4}$}) {
420     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
421     chomp $gitlog;
422     if ($gitlog =~ /^[a-f0-9]{40}$/) {
423       $commit = $gitlog;
424       Log (undef, "Using commit $commit for script_version $treeish");
425     }
426   }
427
428   # If that didn't work, try asking git to look it up as a tree-ish.
429
430   if (!defined $commit) {
431
432     my $cooked_treeish = $treeish;
433     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
434       # Looks like a git branch name -- make sure git knows it's
435       # relative to the remote repo
436       $cooked_treeish = "origin/$treeish";
437     }
438
439     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
440     chomp $found;
441     if ($found =~ /^[0-9a-f]{40}$/s) {
442       $commit = $found;
443       if ($commit ne $treeish) {
444         # Make sure we record the real commit id in the database,
445         # frozentokey, logs, etc. -- instead of an abbreviation or a
446         # branch name which can become ambiguous or point to a
447         # different commit in the future.
448         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
449         Log (undef, "Using commit $commit for tree-ish $treeish");
450         if ($commit ne $treeish) {
451           $Job->{'script_version'} = $commit;
452           !$job_has_uuid or
453               $Job->update_attributes('script_version' => $commit) or
454               croak("Error while updating job");
455         }
456       }
457     }
458   }
459
460   if (defined $commit) {
461     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
462     @execargs = ("sh", "-c",
463                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
464     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
465   }
466   else {
467     croak ("could not figure out commit id for $treeish");
468   }
469
470   my $installpid = fork();
471   if ($installpid == 0)
472   {
473     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
474     exit (1);
475   }
476   while (1)
477   {
478     last if $installpid == waitpid (-1, WNOHANG);
479     freeze_if_want_freeze ($installpid);
480     select (undef, undef, undef, 0.1);
481   }
482   Log (undef, "Install exited $?");
483 }
484
485 if (!$have_slurm)
486 {
487   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
488   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
489 }
490
491
492
493 foreach (qw (script script_version script_parameters runtime_constraints))
494 {
495   Log (undef,
496        "$_ " .
497        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
498 }
499 foreach (split (/\n/, $Job->{knobs}))
500 {
501   Log (undef, "knob " . $_);
502 }
503
504
505
506 $main::success = undef;
507
508
509
510 ONELEVEL:
511
512 my $thisround_succeeded = 0;
513 my $thisround_failed = 0;
514 my $thisround_failed_multiple = 0;
515
516 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
517                        or $a <=> $b } @jobstep_todo;
518 my $level = $jobstep[$jobstep_todo[0]]->{level};
519 Log (undef, "start level $level");
520
521
522
523 my %proc;
524 my @freeslot = (0..$#slot);
525 my @holdslot;
526 my %reader;
527 my $progress_is_dirty = 1;
528 my $progress_stats_updated = 0;
529
530 update_progress_stats();
531
532
533
534 THISROUND:
535 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
536 {
537   my $id = $jobstep_todo[$todo_ptr];
538   my $Jobstep = $jobstep[$id];
539   if ($Jobstep->{level} != $level)
540   {
541     next;
542   }
543
544   pipe $reader{$id}, "writer" or croak ($!);
545   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
546   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
547
548   my $childslot = $freeslot[0];
549   my $childnode = $slot[$childslot]->{node};
550   my $childslotname = join (".",
551                             $slot[$childslot]->{node}->{name},
552                             $slot[$childslot]->{cpu});
553   my $childpid = fork();
554   if ($childpid == 0)
555   {
556     $SIG{'INT'} = 'DEFAULT';
557     $SIG{'QUIT'} = 'DEFAULT';
558     $SIG{'TERM'} = 'DEFAULT';
559
560     foreach (values (%reader))
561     {
562       close($_);
563     }
564     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
565     open(STDOUT,">&writer");
566     open(STDERR,">&writer");
567
568     undef $dbh;
569     undef $sth;
570
571     delete $ENV{"GNUPGHOME"};
572     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
573     $ENV{"TASK_QSEQUENCE"} = $id;
574     $ENV{"TASK_SEQUENCE"} = $level;
575     $ENV{"JOB_SCRIPT"} = $Job->{script};
576     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
577       $param =~ tr/a-z/A-Z/;
578       $ENV{"JOB_PARAMETER_$param"} = $value;
579     }
580     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
581     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
582     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
583     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
584     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
585     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
586     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
587
588     $ENV{"GZIP"} = "-n";
589
590     my @srunargs = (
591       "srun",
592       "--nodelist=".$childnode->{name},
593       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
594       "--job-name=$job_id.$id.$$",
595         );
596     my @execargs = qw(sh);
597     my $build_script_to_send = "";
598     my $command =
599         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
600         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
601         ."&& cd $ENV{CRUNCH_TMP} ";
602     if ($build_script)
603     {
604       $build_script_to_send = $build_script;
605       $command .=
606           "&& perl -";
607     }
608     $command .=
609         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
610     my @execargs = ('bash', '-c', $command);
611     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
612     exit (111);
613   }
614   close("writer");
615   if (!defined $childpid)
616   {
617     close $reader{$id};
618     delete $reader{$id};
619     next;
620   }
621   shift @freeslot;
622   $proc{$childpid} = { jobstep => $id,
623                        time => time,
624                        slot => $childslot,
625                        jobstepname => "$job_id.$id.$childpid",
626                      };
627   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
628   $slot[$childslot]->{pid} = $childpid;
629
630   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
631   Log ($id, "child $childpid started on $childslotname");
632   $Jobstep->{starttime} = time;
633   $Jobstep->{node} = $childnode->{name};
634   $Jobstep->{slotindex} = $childslot;
635   delete $Jobstep->{stderr};
636   delete $Jobstep->{finishtime};
637
638   splice @jobstep_todo, $todo_ptr, 1;
639   --$todo_ptr;
640
641   $progress_is_dirty = 1;
642
643   while (!@freeslot
644          ||
645          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
646   {
647     last THISROUND if $main::please_freeze;
648     if ($main::please_info)
649     {
650       $main::please_info = 0;
651       freeze();
652       collate_output();
653       save_meta(1);
654       update_progress_stats();
655     }
656     my $gotsome
657         = readfrompipes ()
658         + reapchildren ();
659     if (!$gotsome)
660     {
661       check_refresh_wanted();
662       check_squeue();
663       update_progress_stats();
664       select (undef, undef, undef, 0.1);
665     }
666     elsif (time - $progress_stats_updated >= 30)
667     {
668       update_progress_stats();
669     }
670     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
671         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
672     {
673       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
674           .($thisround_failed+$thisround_succeeded)
675           .") -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679
680     # move slots from freeslot to holdslot (or back to freeslot) if necessary
681     for (my $i=$#freeslot; $i>=0; $i--) {
682       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
683         push @holdslot, (splice @freeslot, $i, 1);
684       }
685     }
686     for (my $i=$#holdslot; $i>=0; $i--) {
687       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
688         push @freeslot, (splice @holdslot, $i, 1);
689       }
690     }
691
692     # give up if no nodes are succeeding
693     if (!grep { $_->{node}->{losing_streak} == 0 &&
694                     $_->{node}->{hold_count} < 4 } @slot) {
695       my $message = "Every node has failed -- giving up on this round";
696       Log (undef, $message);
697       last THISROUND;
698     }
699   }
700 }
701
702
703 push @freeslot, splice @holdslot;
704 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
705
706
707 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
708 while (%proc)
709 {
710   if ($main::please_continue) {
711     $main::please_continue = 0;
712     goto THISROUND;
713   }
714   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
715   readfrompipes ();
716   if (!reapchildren())
717   {
718     check_refresh_wanted();
719     check_squeue();
720     update_progress_stats();
721     select (undef, undef, undef, 0.1);
722     killem (keys %proc) if $main::please_freeze;
723   }
724 }
725
726 update_progress_stats();
727 freeze_if_want_freeze();
728
729
730 if (!defined $main::success)
731 {
732   if (@jobstep_todo &&
733       $thisround_succeeded == 0 &&
734       ($thisround_failed == 0 || $thisround_failed > 4))
735   {
736     my $message = "stop because $thisround_failed tasks failed and none succeeded";
737     Log (undef, $message);
738     $main::success = 0;
739   }
740   if (!@jobstep_todo)
741   {
742     $main::success = 1;
743   }
744 }
745
746 goto ONELEVEL if !defined $main::success;
747
748
749 release_allocation();
750 freeze();
751 if ($job_has_uuid) {
752   $Job->update_attributes('output' => &collate_output(),
753                           'running' => 0,
754                           'success' => $Job->{'output'} && $main::success,
755                           'finished_at' => scalar gmtime)
756 }
757
758 if ($Job->{'output'})
759 {
760   eval {
761     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
762     $arv->{'collections'}->{'create'}->execute('collection' => {
763       'uuid' => $Job->{'output'},
764       'manifest_text' => $manifest_text,
765     });
766     if ($Job->{'output_is_persistent'}) {
767       $arv->{'links'}->{'create'}->execute('link' => {
768         'tail_kind' => 'arvados#user',
769         'tail_uuid' => $User->{'uuid'},
770         'head_kind' => 'arvados#collection',
771         'head_uuid' => $Job->{'output'},
772         'link_class' => 'resources',
773         'name' => 'wants',
774       });
775     }
776   };
777   if ($@) {
778     Log (undef, "Failed to register output manifest: $@");
779   }
780 }
781
782 Log (undef, "finish");
783
784 save_meta();
785 exit 0;
786
787
788
789 sub update_progress_stats
790 {
791   $progress_stats_updated = time;
792   return if !$progress_is_dirty;
793   my ($todo, $done, $running) = (scalar @jobstep_todo,
794                                  scalar @jobstep_done,
795                                  scalar @slot - scalar @freeslot - scalar @holdslot);
796   $Job->{'tasks_summary'} ||= {};
797   $Job->{'tasks_summary'}->{'todo'} = $todo;
798   $Job->{'tasks_summary'}->{'done'} = $done;
799   $Job->{'tasks_summary'}->{'running'} = $running;
800   if ($job_has_uuid) {
801     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
802   }
803   Log (undef, "status: $done done, $running running, $todo todo");
804   $progress_is_dirty = 0;
805 }
806
807
808
809 sub reapchildren
810 {
811   my $pid = waitpid (-1, WNOHANG);
812   return 0 if $pid <= 0;
813
814   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
815                   . "."
816                   . $slot[$proc{$pid}->{slot}]->{cpu});
817   my $jobstepid = $proc{$pid}->{jobstep};
818   my $elapsed = time - $proc{$pid}->{time};
819   my $Jobstep = $jobstep[$jobstepid];
820
821   my $childstatus = $?;
822   my $exitvalue = $childstatus >> 8;
823   my $exitinfo = sprintf("exit %d signal %d%s",
824                          $exitvalue,
825                          $childstatus & 127,
826                          ($childstatus & 128 ? ' core dump' : ''));
827   $Jobstep->{'arvados_task'}->reload;
828   my $task_success = $Jobstep->{'arvados_task'}->{success};
829
830   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
831
832   if (!defined $task_success) {
833     # task did not indicate one way or the other --> fail
834     $Jobstep->{'arvados_task'}->{success} = 0;
835     $Jobstep->{'arvados_task'}->save;
836     $task_success = 0;
837   }
838
839   if (!$task_success)
840   {
841     my $temporary_fail;
842     $temporary_fail ||= $Jobstep->{node_fail};
843     $temporary_fail ||= ($exitvalue == 111);
844
845     ++$thisround_failed;
846     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
847
848     # Check for signs of a failed or misconfigured node
849     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
850         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
851       # Don't count this against jobstep failure thresholds if this
852       # node is already suspected faulty and srun exited quickly
853       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
854           $elapsed < 5) {
855         Log ($jobstepid, "blaming failure on suspect node " .
856              $slot[$proc{$pid}->{slot}]->{node}->{name});
857         $temporary_fail ||= 1;
858       }
859       ban_node_by_slot($proc{$pid}->{slot});
860     }
861
862     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
863                              ++$Jobstep->{'failures'},
864                              $temporary_fail ? 'temporary ' : 'permanent',
865                              $elapsed));
866
867     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
868       # Give up on this task, and the whole job
869       $main::success = 0;
870       $main::please_freeze = 1;
871     }
872     else {
873       # Put this task back on the todo queue
874       push @jobstep_todo, $jobstepid;
875     }
876     $Job->{'tasks_summary'}->{'failed'}++;
877   }
878   else
879   {
880     ++$thisround_succeeded;
881     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
882     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
883     push @jobstep_done, $jobstepid;
884     Log ($jobstepid, "success in $elapsed seconds");
885   }
886   $Jobstep->{exitcode} = $childstatus;
887   $Jobstep->{finishtime} = time;
888   process_stderr ($jobstepid, $task_success);
889   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
890
891   close $reader{$jobstepid};
892   delete $reader{$jobstepid};
893   delete $slot[$proc{$pid}->{slot}]->{pid};
894   push @freeslot, $proc{$pid}->{slot};
895   delete $proc{$pid};
896
897   # Load new tasks
898   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
899     'where' => {
900       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
901     },
902     'order' => 'qsequence'
903   );
904   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
905     my $jobstep = {
906       'level' => $arvados_task->{'sequence'},
907       'failures' => 0,
908       'arvados_task' => $arvados_task
909     };
910     push @jobstep, $jobstep;
911     push @jobstep_todo, $#jobstep;
912   }
913
914   $progress_is_dirty = 1;
915   1;
916 }
917
918 sub check_refresh_wanted
919 {
920   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
921   if (@stat && $stat[9] > $latest_refresh) {
922     $latest_refresh = scalar time;
923     if ($job_has_uuid) {
924       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
925       for my $attr ('cancelled_at',
926                     'cancelled_by_user_uuid',
927                     'cancelled_by_client_uuid') {
928         $Job->{$attr} = $Job2->{$attr};
929       }
930       if ($Job->{'cancelled_at'}) {
931         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
932              " by user " . $Job->{cancelled_by_user_uuid});
933         $main::success = 0;
934         $main::please_freeze = 1;
935       }
936     }
937   }
938 }
939
940 sub check_squeue
941 {
942   # return if the kill list was checked <4 seconds ago
943   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
944   {
945     return;
946   }
947   $squeue_kill_checked = time;
948
949   # use killem() on procs whose killtime is reached
950   for (keys %proc)
951   {
952     if (exists $proc{$_}->{killtime}
953         && $proc{$_}->{killtime} <= time)
954     {
955       killem ($_);
956     }
957   }
958
959   # return if the squeue was checked <60 seconds ago
960   if (defined $squeue_checked && $squeue_checked > time - 60)
961   {
962     return;
963   }
964   $squeue_checked = time;
965
966   if (!$have_slurm)
967   {
968     # here is an opportunity to check for mysterious problems with local procs
969     return;
970   }
971
972   # get a list of steps still running
973   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
974   chop @squeue;
975   if ($squeue[-1] ne "ok")
976   {
977     return;
978   }
979   pop @squeue;
980
981   # which of my jobsteps are running, according to squeue?
982   my %ok;
983   foreach (@squeue)
984   {
985     if (/^(\d+)\.(\d+) (\S+)/)
986     {
987       if ($1 eq $ENV{SLURM_JOBID})
988       {
989         $ok{$3} = 1;
990       }
991     }
992   }
993
994   # which of my active child procs (>60s old) were not mentioned by squeue?
995   foreach (keys %proc)
996   {
997     if ($proc{$_}->{time} < time - 60
998         && !exists $ok{$proc{$_}->{jobstepname}}
999         && !exists $proc{$_}->{killtime})
1000     {
1001       # kill this proc if it hasn't exited in 30 seconds
1002       $proc{$_}->{killtime} = time + 30;
1003     }
1004   }
1005 }
1006
1007
1008 sub release_allocation
1009 {
1010   if ($have_slurm)
1011   {
1012     Log (undef, "release job allocation");
1013     system "scancel $ENV{SLURM_JOBID}";
1014   }
1015 }
1016
1017
1018 sub readfrompipes
1019 {
1020   my $gotsome = 0;
1021   foreach my $job (keys %reader)
1022   {
1023     my $buf;
1024     while (0 < sysread ($reader{$job}, $buf, 8192))
1025     {
1026       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1027       $jobstep[$job]->{stderr} .= $buf;
1028       preprocess_stderr ($job);
1029       if (length ($jobstep[$job]->{stderr}) > 16384)
1030       {
1031         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1032       }
1033       $gotsome = 1;
1034     }
1035   }
1036   return $gotsome;
1037 }
1038
1039
1040 sub preprocess_stderr
1041 {
1042   my $job = shift;
1043
1044   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1045     my $line = $1;
1046     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1047     Log ($job, "stderr $line");
1048     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1049       # whoa.
1050       $main::please_freeze = 1;
1051     }
1052     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1053       $jobstep[$job]->{node_fail} = 1;
1054       ban_node_by_slot($jobstep[$job]->{slotindex});
1055     }
1056   }
1057 }
1058
1059
1060 sub process_stderr
1061 {
1062   my $job = shift;
1063   my $task_success = shift;
1064   preprocess_stderr ($job);
1065
1066   map {
1067     Log ($job, "stderr $_");
1068   } split ("\n", $jobstep[$job]->{stderr});
1069 }
1070
1071 sub fetch_block
1072 {
1073   my $hash = shift;
1074   my ($child_out, $output_block);
1075
1076   my $cmd = "arv keep get \Q$hash\E";
1077   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1078   sysread($keep, $output_block, 64 * 1024 * 1024);
1079   close $keep;
1080   return $output_block;
1081 }
1082
1083 sub collate_output
1084 {
1085   Log (undef, "collate");
1086
1087   my ($child_out, $child_in);
1088   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1089   my $joboutput;
1090   for (@jobstep)
1091   {
1092     next if (!exists $_->{'arvados_task'}->{output} ||
1093              !$_->{'arvados_task'}->{'success'} ||
1094              $_->{'exitcode'} != 0);
1095     my $output = $_->{'arvados_task'}->{output};
1096     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1097     {
1098       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1099       print $child_in $output;
1100     }
1101     elsif (@jobstep == 1)
1102     {
1103       $joboutput = $output;
1104       last;
1105     }
1106     elsif (defined (my $outblock = fetch_block ($output)))
1107     {
1108       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1109       print $child_in $outblock;
1110     }
1111     else
1112     {
1113       Log (undef, "XXX fetch_block($output) failed XXX");
1114       $main::success = 0;
1115     }
1116   }
1117   $child_in->close;
1118
1119   if (!defined $joboutput) {
1120     my $s = IO::Select->new($child_out);
1121     if ($s->can_read(120)) {
1122       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1123     } else {
1124       Log (undef, "timed out reading from 'arv keep put'");
1125     }
1126   }
1127   waitpid($pid, 0);
1128
1129   if ($joboutput)
1130   {
1131     Log (undef, "output $joboutput");
1132     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1133   }
1134   else
1135   {
1136     Log (undef, "output undef");
1137   }
1138   return $joboutput;
1139 }
1140
1141
1142 sub killem
1143 {
1144   foreach (@_)
1145   {
1146     my $sig = 2;                # SIGINT first
1147     if (exists $proc{$_}->{"sent_$sig"} &&
1148         time - $proc{$_}->{"sent_$sig"} > 4)
1149     {
1150       $sig = 15;                # SIGTERM if SIGINT doesn't work
1151     }
1152     if (exists $proc{$_}->{"sent_$sig"} &&
1153         time - $proc{$_}->{"sent_$sig"} > 4)
1154     {
1155       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1156     }
1157     if (!exists $proc{$_}->{"sent_$sig"})
1158     {
1159       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1160       kill $sig, $_;
1161       select (undef, undef, undef, 0.1);
1162       if ($sig == 2)
1163       {
1164         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1165       }
1166       $proc{$_}->{"sent_$sig"} = time;
1167       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1168     }
1169   }
1170 }
1171
1172
1173 sub fhbits
1174 {
1175   my($bits);
1176   for (@_) {
1177     vec($bits,fileno($_),1) = 1;
1178   }
1179   $bits;
1180 }
1181
1182
1183 sub Log                         # ($jobstep_id, $logmessage)
1184 {
1185   if ($_[1] =~ /\n/) {
1186     for my $line (split (/\n/, $_[1])) {
1187       Log ($_[0], $line);
1188     }
1189     return;
1190   }
1191   my $fh = select STDERR; $|=1; select $fh;
1192   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1193   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1194   $message .= "\n";
1195   my $datetime;
1196   if ($metastream || -t STDERR) {
1197     my @gmtime = gmtime;
1198     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1199                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1200   }
1201   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1202
1203   if ($metastream) {
1204     print $metastream $datetime . " " . $message;
1205   }
1206 }
1207
1208
1209 sub croak
1210 {
1211   my ($package, $file, $line) = caller;
1212   my $message = "@_ at $file line $line\n";
1213   Log (undef, $message);
1214   freeze() if @jobstep_todo;
1215   collate_output() if @jobstep_todo;
1216   cleanup();
1217   save_meta() if $metastream;
1218   die;
1219 }
1220
1221
1222 sub cleanup
1223 {
1224   return if !$job_has_uuid;
1225   $Job->update_attributes('running' => 0,
1226                           'success' => 0,
1227                           'finished_at' => scalar gmtime);
1228 }
1229
1230
1231 sub save_meta
1232 {
1233   my $justcheckpoint = shift; # false if this will be the last meta saved
1234   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1235
1236   $local_logfile->flush;
1237   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1238       . quotemeta($local_logfile->filename);
1239   my $loglocator = `$cmd`;
1240   die "system $cmd failed: $?" if $?;
1241
1242   $local_logfile = undef;   # the temp file is automatically deleted
1243   Log (undef, "log manifest is $loglocator");
1244   $Job->{'log'} = $loglocator;
1245   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1246 }
1247
1248
1249 sub freeze_if_want_freeze
1250 {
1251   if ($main::please_freeze)
1252   {
1253     release_allocation();
1254     if (@_)
1255     {
1256       # kill some srun procs before freeze+stop
1257       map { $proc{$_} = {} } @_;
1258       while (%proc)
1259       {
1260         killem (keys %proc);
1261         select (undef, undef, undef, 0.1);
1262         my $died;
1263         while (($died = waitpid (-1, WNOHANG)) > 0)
1264         {
1265           delete $proc{$died};
1266         }
1267       }
1268     }
1269     freeze();
1270     collate_output();
1271     cleanup();
1272     save_meta();
1273     exit 0;
1274   }
1275 }
1276
1277
1278 sub freeze
1279 {
1280   Log (undef, "Freeze not implemented");
1281   return;
1282 }
1283
1284
1285 sub thaw
1286 {
1287   croak ("Thaw not implemented");
1288 }
1289
1290
1291 sub freezequote
1292 {
1293   my $s = shift;
1294   $s =~ s/\\/\\\\/g;
1295   $s =~ s/\n/\\n/g;
1296   return $s;
1297 }
1298
1299
1300 sub freezeunquote
1301 {
1302   my $s = shift;
1303   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1304   return $s;
1305 }
1306
1307
1308 sub srun
1309 {
1310   my $srunargs = shift;
1311   my $execargs = shift;
1312   my $opts = shift || {};
1313   my $stdin = shift;
1314   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1315   print STDERR (join (" ",
1316                       map { / / ? "'$_'" : $_ }
1317                       (@$args)),
1318                 "\n")
1319       if $ENV{CRUNCH_DEBUG};
1320
1321   if (defined $stdin) {
1322     my $child = open STDIN, "-|";
1323     defined $child or die "no fork: $!";
1324     if ($child == 0) {
1325       print $stdin or die $!;
1326       close STDOUT or die $!;
1327       exit 0;
1328     }
1329   }
1330
1331   return system (@$args) if $opts->{fork};
1332
1333   exec @$args;
1334   warn "ENV size is ".length(join(" ",%ENV));
1335   die "exec failed: $!: @$args";
1336 }
1337
1338
1339 sub ban_node_by_slot {
1340   # Don't start any new jobsteps on this node for 60 seconds
1341   my $slotid = shift;
1342   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1343   $slot[$slotid]->{node}->{hold_count}++;
1344   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1345 }
1346
1347 sub must_lock_now
1348 {
1349   my ($lockfile, $error_message) = @_;
1350   open L, ">", $lockfile or croak("$lockfile: $!");
1351   if (!flock L, LOCK_EX|LOCK_NB) {
1352     croak("Can't lock $lockfile: $error_message\n");
1353   }
1354 }
1355
1356 __DATA__
1357 #!/usr/bin/perl
1358
1359 # checkout-and-build
1360
1361 use Fcntl ':flock';
1362
1363 my $destdir = $ENV{"CRUNCH_SRC"};
1364 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1365 my $repo = $ENV{"CRUNCH_SRC_URL"};
1366
1367 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1368 flock L, LOCK_EX;
1369 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1370     exit 0;
1371 }
1372
1373 unlink "$destdir.commit";
1374 open STDOUT, ">", "$destdir.log";
1375 open STDERR, ">&STDOUT";
1376
1377 mkdir $destdir;
1378 my @git_archive_data = <DATA>;
1379 if (@git_archive_data) {
1380   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1381   print TARX @git_archive_data;
1382   if(!close(TARX)) {
1383     die "'tar -C $destdir -xf -' exited $?: $!";
1384   }
1385 }
1386
1387 my $pwd;
1388 chomp ($pwd = `pwd`);
1389 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1390 mkdir $install_dir;
1391
1392 for my $src_path ("$destdir/arvados/sdk/python") {
1393   if (-d $src_path) {
1394     shell_or_die ("virtualenv", $install_dir);
1395     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1396   }
1397 }
1398
1399 if (-e "$destdir/crunch_scripts/install") {
1400     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1401 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1402     # Old version
1403     shell_or_die ("./tests/autotests.sh", $install_dir);
1404 } elsif (-e "./install.sh") {
1405     shell_or_die ("./install.sh", $install_dir);
1406 }
1407
1408 if ($commit) {
1409     unlink "$destdir.commit.new";
1410     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1411     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1412 }
1413
1414 close L;
1415
1416 exit 0;
1417
1418 sub shell_or_die
1419 {
1420   if ($ENV{"DEBUG"}) {
1421     print STDERR "@_\n";
1422   }
1423   system (@_) == 0
1424       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1425 }
1426
1427 __DATA__