Merge branch 'master' into 2060-edit-tags-in-workbench
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
90
91 my $force_unlock;
92 my $git_dir;
93 my $jobspec;
94 my $job_api_token;
95 my $resume_stash;
96 GetOptions('force-unlock' => \$force_unlock,
97            'git-dir=s' => \$git_dir,
98            'job=s' => \$jobspec,
99            'job-api-token=s' => \$job_api_token,
100            'resume-stash=s' => \$resume_stash,
101     );
102
103 if (defined $job_api_token) {
104   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 }
106
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
110
111
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new('apiVersion' => 'v1');
124 my $metastream;
125
126 my $User = $arv->{'users'}->{'current'}->execute;
127
128 my $Job = {};
129 my $job_id;
130 my $dbh;
131 my $sth;
132 if ($job_has_uuid)
133 {
134   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135   if (!$force_unlock) {
136     if ($Job->{'is_locked_by_uuid'}) {
137       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138     }
139     if ($Job->{'success'} ne undef) {
140       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141     }
142     if ($Job->{'running'}) {
143       croak("Job 'running' flag is already set");
144     }
145     if ($Job->{'started_at'}) {
146       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
147     }
148   }
149 }
150 else
151 {
152   $Job = JSON::decode_json($jobspec);
153
154   if (!$resume_stash)
155   {
156     map { croak ("No $_ specified") unless $Job->{$_} }
157     qw(script script_version script_parameters);
158   }
159
160   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161   $Job->{'started_at'} = gmtime;
162
163   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164
165   $job_has_uuid = 1;
166 }
167 $job_id = $Job->{'uuid'};
168
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
170 $metastream->clear;
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
173
174
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
178
179
180 Log (undef, "check slurm allocation");
181 my @slot;
182 my @node;
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my @sinfo;
185 if (!$have_slurm)
186 {
187   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188   push @sinfo, "$localcpus localhost";
189 }
190 if (exists $ENV{SLURM_NODELIST})
191 {
192   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 }
194 foreach (@sinfo)
195 {
196   my ($ncpus, $slurm_nodelist) = split;
197   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198
199   my @nodelist;
200   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201   {
202     my $nodelist = $1;
203     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204     {
205       my $ranges = $1;
206       foreach (split (",", $ranges))
207       {
208         my ($a, $b);
209         if (/(\d+)-(\d+)/)
210         {
211           $a = $1;
212           $b = $2;
213         }
214         else
215         {
216           $a = $_;
217           $b = $_;
218         }
219         push @nodelist, map {
220           my $n = $nodelist;
221           $n =~ s/\[[-,\d]+\]/$_/;
222           $n;
223         } ($a..$b);
224       }
225     }
226     else
227     {
228       push @nodelist, $nodelist;
229     }
230   }
231   foreach my $nodename (@nodelist)
232   {
233     Log (undef, "node $nodename - $ncpus slots");
234     my $node = { name => $nodename,
235                  ncpus => $ncpus,
236                  losing_streak => 0,
237                  hold_until => 0 };
238     foreach my $cpu (1..$ncpus)
239     {
240       push @slot, { node => $node,
241                     cpu => $cpu };
242     }
243   }
244   push @node, @nodelist;
245 }
246
247
248
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
251
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
253
254
255
256 my $jobmanager_id;
257 if ($job_has_uuid)
258 {
259   # Claim this job, and make sure nobody else does
260   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262     croak("Error while updating / locking job");
263   }
264   $Job->update_attributes('started_at' => scalar gmtime,
265                           'running' => 1,
266                           'success' => undef,
267                           'tasks_summary' => { 'failed' => 0,
268                                                'todo' => 1,
269                                                'running' => 0,
270                                                'done' => 0 });
271 }
272
273
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
303
304
305
306 if (defined $Job->{thawedfromkey})
307 {
308   thaw ($Job->{thawedfromkey});
309 }
310 else
311 {
312   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313     'job_uuid' => $Job->{'uuid'},
314     'sequence' => 0,
315     'qsequence' => 0,
316     'parameters' => {},
317                                                           });
318   push @jobstep, { 'level' => 0,
319                    'failures' => 0,
320                    'arvados_task' => $first_task,
321                  };
322   push @jobstep_todo, 0;
323 }
324
325
326 my $build_script;
327
328
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
332 if ($skip_install)
333 {
334   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335   system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
336       or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
337   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python",
338                     "$ENV{CRUNCH_SRC}/sdk/python") {
339     if (-d $src_path) {
340       system ("cd $src_path && \$CRUNCH_TMP/opt/bin/python setup.py install")
341           == 0
342           or croak ("setup.py in $src_path failed: exit ".($?>>8));
343     }
344   }
345 }
346 else
347 {
348   do {
349     local $/ = undef;
350     $build_script = <DATA>;
351   };
352   Log (undef, "Install revision ".$Job->{script_version});
353   my $nodelist = join(",", @node);
354
355   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
361           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
362     exit (1);
363   }
364   while (1)
365   {
366     last if $cleanpid == waitpid (-1, WNOHANG);
367     freeze_if_want_freeze ($cleanpid);
368     select (undef, undef, undef, 0.1);
369   }
370   Log (undef, "Clean-work-dir exited $?");
371
372   # Install requested code version
373
374   my @execargs;
375   my @srunargs = ("srun",
376                   "--nodelist=$nodelist",
377                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
378
379   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
380   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
381
382   my $commit;
383   my $git_archive;
384   my $treeish = $Job->{'script_version'};
385   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
386   # Todo: let script_version specify repository instead of expecting
387   # parent process to figure it out.
388   $ENV{"CRUNCH_SRC_URL"} = $repo;
389
390   # Create/update our clone of the remote git repo
391
392   if (!-d $ENV{"CRUNCH_SRC"}) {
393     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
394         or croak ("git clone $repo failed: exit ".($?>>8));
395     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
396   }
397   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
398
399   # If this looks like a subversion r#, look for it in git-svn commit messages
400
401   if ($treeish =~ m{^\d{1,4}$}) {
402     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
403     chomp $gitlog;
404     if ($gitlog =~ /^[a-f0-9]{40}$/) {
405       $commit = $gitlog;
406       Log (undef, "Using commit $commit for script_version $treeish");
407     }
408   }
409
410   # If that didn't work, try asking git to look it up as a tree-ish.
411
412   if (!defined $commit) {
413
414     my $cooked_treeish = $treeish;
415     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
416       # Looks like a git branch name -- make sure git knows it's
417       # relative to the remote repo
418       $cooked_treeish = "origin/$treeish";
419     }
420
421     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
422     chomp $found;
423     if ($found =~ /^[0-9a-f]{40}$/s) {
424       $commit = $found;
425       if ($commit ne $treeish) {
426         # Make sure we record the real commit id in the database,
427         # frozentokey, logs, etc. -- instead of an abbreviation or a
428         # branch name which can become ambiguous or point to a
429         # different commit in the future.
430         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
431         Log (undef, "Using commit $commit for tree-ish $treeish");
432         if ($commit ne $treeish) {
433           $Job->{'script_version'} = $commit;
434           !$job_has_uuid or
435               $Job->update_attributes('script_version' => $commit) or
436               croak("Error while updating job");
437         }
438       }
439     }
440   }
441
442   if (defined $commit) {
443     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
444     @execargs = ("sh", "-c",
445                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
446     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
447   }
448   else {
449     croak ("could not figure out commit id for $treeish");
450   }
451
452   my $installpid = fork();
453   if ($installpid == 0)
454   {
455     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
456     exit (1);
457   }
458   while (1)
459   {
460     last if $installpid == waitpid (-1, WNOHANG);
461     freeze_if_want_freeze ($installpid);
462     select (undef, undef, undef, 0.1);
463   }
464   Log (undef, "Install exited $?");
465 }
466
467
468
469 foreach (qw (script script_version script_parameters runtime_constraints))
470 {
471   Log (undef,
472        "$_ " .
473        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
474 }
475 foreach (split (/\n/, $Job->{knobs}))
476 {
477   Log (undef, "knob " . $_);
478 }
479
480
481
482 $main::success = undef;
483
484
485
486 ONELEVEL:
487
488 my $thisround_succeeded = 0;
489 my $thisround_failed = 0;
490 my $thisround_failed_multiple = 0;
491
492 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
493                        or $a <=> $b } @jobstep_todo;
494 my $level = $jobstep[$jobstep_todo[0]]->{level};
495 Log (undef, "start level $level");
496
497
498
499 my %proc;
500 my @freeslot = (0..$#slot);
501 my @holdslot;
502 my %reader;
503 my $progress_is_dirty = 1;
504 my $progress_stats_updated = 0;
505
506 update_progress_stats();
507
508
509
510 THISROUND:
511 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
512 {
513   my $id = $jobstep_todo[$todo_ptr];
514   my $Jobstep = $jobstep[$id];
515   if ($Jobstep->{level} != $level)
516   {
517     next;
518   }
519
520   pipe $reader{$id}, "writer" or croak ($!);
521   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
522   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
523
524   my $childslot = $freeslot[0];
525   my $childnode = $slot[$childslot]->{node};
526   my $childslotname = join (".",
527                             $slot[$childslot]->{node}->{name},
528                             $slot[$childslot]->{cpu});
529   my $childpid = fork();
530   if ($childpid == 0)
531   {
532     $SIG{'INT'} = 'DEFAULT';
533     $SIG{'QUIT'} = 'DEFAULT';
534     $SIG{'TERM'} = 'DEFAULT';
535
536     foreach (values (%reader))
537     {
538       close($_);
539     }
540     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
541     open(STDOUT,">&writer");
542     open(STDERR,">&writer");
543
544     undef $dbh;
545     undef $sth;
546
547     delete $ENV{"GNUPGHOME"};
548     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
549     $ENV{"TASK_QSEQUENCE"} = $id;
550     $ENV{"TASK_SEQUENCE"} = $level;
551     $ENV{"JOB_SCRIPT"} = $Job->{script};
552     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
553       $param =~ tr/a-z/A-Z/;
554       $ENV{"JOB_PARAMETER_$param"} = $value;
555     }
556     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
557     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
558     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
559     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
560     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
561     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
562     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
563
564     $ENV{"GZIP"} = "-n";
565
566     my @srunargs = (
567       "srun",
568       "--nodelist=".$childnode->{name},
569       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
570       "--job-name=$job_id.$id.$$",
571         );
572     my @execargs = qw(sh);
573     my $build_script_to_send = "";
574     my $command =
575         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
576         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
577         ."&& cd $ENV{CRUNCH_TMP} ";
578     if ($build_script)
579     {
580       $build_script_to_send = $build_script;
581       $command .=
582           "&& perl -";
583     }
584     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
585     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
586     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
587     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
588     $command .=
589         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
590     my @execargs = ('bash', '-c', $command);
591     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
592     exit (111);
593   }
594   close("writer");
595   if (!defined $childpid)
596   {
597     close $reader{$id};
598     delete $reader{$id};
599     next;
600   }
601   shift @freeslot;
602   $proc{$childpid} = { jobstep => $id,
603                        time => time,
604                        slot => $childslot,
605                        jobstepname => "$job_id.$id.$childpid",
606                      };
607   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
608   $slot[$childslot]->{pid} = $childpid;
609
610   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
611   Log ($id, "child $childpid started on $childslotname");
612   $Jobstep->{starttime} = time;
613   $Jobstep->{node} = $childnode->{name};
614   $Jobstep->{slotindex} = $childslot;
615   delete $Jobstep->{stderr};
616   delete $Jobstep->{finishtime};
617
618   splice @jobstep_todo, $todo_ptr, 1;
619   --$todo_ptr;
620
621   $progress_is_dirty = 1;
622
623   while (!@freeslot
624          ||
625          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
626   {
627     last THISROUND if $main::please_freeze;
628     if ($main::please_info)
629     {
630       $main::please_info = 0;
631       freeze();
632       collate_output();
633       save_meta(1);
634       update_progress_stats();
635     }
636     my $gotsome
637         = readfrompipes ()
638         + reapchildren ();
639     if (!$gotsome)
640     {
641       check_refresh_wanted();
642       check_squeue();
643       update_progress_stats();
644       select (undef, undef, undef, 0.1);
645     }
646     elsif (time - $progress_stats_updated >= 30)
647     {
648       update_progress_stats();
649     }
650     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
651         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
652     {
653       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
654           .($thisround_failed+$thisround_succeeded)
655           .") -- giving up on this round";
656       Log (undef, $message);
657       last THISROUND;
658     }
659
660     # move slots from freeslot to holdslot (or back to freeslot) if necessary
661     for (my $i=$#freeslot; $i>=0; $i--) {
662       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
663         push @holdslot, (splice @freeslot, $i, 1);
664       }
665     }
666     for (my $i=$#holdslot; $i>=0; $i--) {
667       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
668         push @freeslot, (splice @holdslot, $i, 1);
669       }
670     }
671
672     # give up if no nodes are succeeding
673     if (!grep { $_->{node}->{losing_streak} == 0 &&
674                     $_->{node}->{hold_count} < 4 } @slot) {
675       my $message = "Every node has failed -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679   }
680 }
681
682
683 push @freeslot, splice @holdslot;
684 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
685
686
687 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
688 while (%proc)
689 {
690   if ($main::please_continue) {
691     $main::please_continue = 0;
692     goto THISROUND;
693   }
694   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
695   readfrompipes ();
696   if (!reapchildren())
697   {
698     check_refresh_wanted();
699     check_squeue();
700     update_progress_stats();
701     select (undef, undef, undef, 0.1);
702     killem (keys %proc) if $main::please_freeze;
703   }
704 }
705
706 update_progress_stats();
707 freeze_if_want_freeze();
708
709
710 if (!defined $main::success)
711 {
712   if (@jobstep_todo &&
713       $thisround_succeeded == 0 &&
714       ($thisround_failed == 0 || $thisround_failed > 4))
715   {
716     my $message = "stop because $thisround_failed tasks failed and none succeeded";
717     Log (undef, $message);
718     $main::success = 0;
719   }
720   if (!@jobstep_todo)
721   {
722     $main::success = 1;
723   }
724 }
725
726 goto ONELEVEL if !defined $main::success;
727
728
729 release_allocation();
730 freeze();
731 if ($job_has_uuid) {
732   $Job->update_attributes('output' => &collate_output(),
733                           'running' => 0,
734                           'success' => $Job->{'output'} && $main::success,
735                           'finished_at' => scalar gmtime)
736 }
737
738 if ($Job->{'output'})
739 {
740   eval {
741     my $manifest_text = capturex("whget", $Job->{'output'});
742     $arv->{'collections'}->{'create'}->execute('collection' => {
743       'uuid' => $Job->{'output'},
744       'manifest_text' => $manifest_text,
745     });
746   };
747   if ($@) {
748     Log (undef, "Failed to register output manifest: $@");
749   }
750 }
751
752 Log (undef, "finish");
753
754 save_meta();
755 exit 0;
756
757
758
759 sub update_progress_stats
760 {
761   $progress_stats_updated = time;
762   return if !$progress_is_dirty;
763   my ($todo, $done, $running) = (scalar @jobstep_todo,
764                                  scalar @jobstep_done,
765                                  scalar @slot - scalar @freeslot - scalar @holdslot);
766   $Job->{'tasks_summary'} ||= {};
767   $Job->{'tasks_summary'}->{'todo'} = $todo;
768   $Job->{'tasks_summary'}->{'done'} = $done;
769   $Job->{'tasks_summary'}->{'running'} = $running;
770   if ($job_has_uuid) {
771     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
772   }
773   Log (undef, "status: $done done, $running running, $todo todo");
774   $progress_is_dirty = 0;
775 }
776
777
778
779 sub reapchildren
780 {
781   my $pid = waitpid (-1, WNOHANG);
782   return 0 if $pid <= 0;
783
784   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
785                   . "."
786                   . $slot[$proc{$pid}->{slot}]->{cpu});
787   my $jobstepid = $proc{$pid}->{jobstep};
788   my $elapsed = time - $proc{$pid}->{time};
789   my $Jobstep = $jobstep[$jobstepid];
790
791   my $childstatus = $?;
792   my $exitvalue = $childstatus >> 8;
793   my $exitinfo = sprintf("exit %d signal %d%s",
794                          $exitvalue,
795                          $childstatus & 127,
796                          ($childstatus & 128 ? ' core dump' : ''));
797   $Jobstep->{'arvados_task'}->reload;
798   my $task_success = $Jobstep->{'arvados_task'}->{success};
799
800   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
801
802   if (!defined $task_success) {
803     # task did not indicate one way or the other --> fail
804     $Jobstep->{'arvados_task'}->{success} = 0;
805     $Jobstep->{'arvados_task'}->save;
806     $task_success = 0;
807   }
808
809   if (!$task_success)
810   {
811     my $temporary_fail;
812     $temporary_fail ||= $Jobstep->{node_fail};
813     $temporary_fail ||= ($exitvalue == 111);
814
815     ++$thisround_failed;
816     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
817
818     # Check for signs of a failed or misconfigured node
819     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
820         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
821       # Don't count this against jobstep failure thresholds if this
822       # node is already suspected faulty and srun exited quickly
823       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
824           $elapsed < 5) {
825         Log ($jobstepid, "blaming failure on suspect node " .
826              $slot[$proc{$pid}->{slot}]->{node}->{name});
827         $temporary_fail ||= 1;
828       }
829       ban_node_by_slot($proc{$pid}->{slot});
830     }
831
832     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
833                              ++$Jobstep->{'failures'},
834                              $temporary_fail ? 'temporary ' : 'permanent',
835                              $elapsed));
836
837     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
838       # Give up on this task, and the whole job
839       $main::success = 0;
840       $main::please_freeze = 1;
841     }
842     else {
843       # Put this task back on the todo queue
844       push @jobstep_todo, $jobstepid;
845     }
846     $Job->{'tasks_summary'}->{'failed'}++;
847   }
848   else
849   {
850     ++$thisround_succeeded;
851     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
852     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
853     push @jobstep_done, $jobstepid;
854     Log ($jobstepid, "success in $elapsed seconds");
855   }
856   $Jobstep->{exitcode} = $childstatus;
857   $Jobstep->{finishtime} = time;
858   process_stderr ($jobstepid, $task_success);
859   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
860
861   close $reader{$jobstepid};
862   delete $reader{$jobstepid};
863   delete $slot[$proc{$pid}->{slot}]->{pid};
864   push @freeslot, $proc{$pid}->{slot};
865   delete $proc{$pid};
866
867   # Load new tasks
868   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
869     'where' => {
870       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
871     },
872     'order' => 'qsequence'
873   );
874   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
875     my $jobstep = {
876       'level' => $arvados_task->{'sequence'},
877       'failures' => 0,
878       'arvados_task' => $arvados_task
879     };
880     push @jobstep, $jobstep;
881     push @jobstep_todo, $#jobstep;
882   }
883
884   $progress_is_dirty = 1;
885   1;
886 }
887
888 sub check_refresh_wanted
889 {
890   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
891   if (@stat && $stat[9] > $latest_refresh) {
892     $latest_refresh = scalar time;
893     if ($job_has_uuid) {
894       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
895       for my $attr ('cancelled_at',
896                     'cancelled_by_user_uuid',
897                     'cancelled_by_client_uuid') {
898         $Job->{$attr} = $Job2->{$attr};
899       }
900       if ($Job->{'cancelled_at'}) {
901         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
902              " by user " . $Job->{cancelled_by_user_uuid});
903         $main::success = 0;
904         $main::please_freeze = 1;
905       }
906     }
907   }
908 }
909
910 sub check_squeue
911 {
912   # return if the kill list was checked <4 seconds ago
913   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
914   {
915     return;
916   }
917   $squeue_kill_checked = time;
918
919   # use killem() on procs whose killtime is reached
920   for (keys %proc)
921   {
922     if (exists $proc{$_}->{killtime}
923         && $proc{$_}->{killtime} <= time)
924     {
925       killem ($_);
926     }
927   }
928
929   # return if the squeue was checked <60 seconds ago
930   if (defined $squeue_checked && $squeue_checked > time - 60)
931   {
932     return;
933   }
934   $squeue_checked = time;
935
936   if (!$have_slurm)
937   {
938     # here is an opportunity to check for mysterious problems with local procs
939     return;
940   }
941
942   # get a list of steps still running
943   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
944   chop @squeue;
945   if ($squeue[-1] ne "ok")
946   {
947     return;
948   }
949   pop @squeue;
950
951   # which of my jobsteps are running, according to squeue?
952   my %ok;
953   foreach (@squeue)
954   {
955     if (/^(\d+)\.(\d+) (\S+)/)
956     {
957       if ($1 eq $ENV{SLURM_JOBID})
958       {
959         $ok{$3} = 1;
960       }
961     }
962   }
963
964   # which of my active child procs (>60s old) were not mentioned by squeue?
965   foreach (keys %proc)
966   {
967     if ($proc{$_}->{time} < time - 60
968         && !exists $ok{$proc{$_}->{jobstepname}}
969         && !exists $proc{$_}->{killtime})
970     {
971       # kill this proc if it hasn't exited in 30 seconds
972       $proc{$_}->{killtime} = time + 30;
973     }
974   }
975 }
976
977
978 sub release_allocation
979 {
980   if ($have_slurm)
981   {
982     Log (undef, "release job allocation");
983     system "scancel $ENV{SLURM_JOBID}";
984   }
985 }
986
987
988 sub readfrompipes
989 {
990   my $gotsome = 0;
991   foreach my $job (keys %reader)
992   {
993     my $buf;
994     while (0 < sysread ($reader{$job}, $buf, 8192))
995     {
996       print STDERR $buf if $ENV{CRUNCH_DEBUG};
997       $jobstep[$job]->{stderr} .= $buf;
998       preprocess_stderr ($job);
999       if (length ($jobstep[$job]->{stderr}) > 16384)
1000       {
1001         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1002       }
1003       $gotsome = 1;
1004     }
1005   }
1006   return $gotsome;
1007 }
1008
1009
1010 sub preprocess_stderr
1011 {
1012   my $job = shift;
1013
1014   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1015     my $line = $1;
1016     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1017     Log ($job, "stderr $line");
1018     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1019       # whoa.
1020       $main::please_freeze = 1;
1021     }
1022     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1023       $jobstep[$job]->{node_fail} = 1;
1024       ban_node_by_slot($jobstep[$job]->{slotindex});
1025     }
1026   }
1027 }
1028
1029
1030 sub process_stderr
1031 {
1032   my $job = shift;
1033   my $task_success = shift;
1034   preprocess_stderr ($job);
1035
1036   map {
1037     Log ($job, "stderr $_");
1038   } split ("\n", $jobstep[$job]->{stderr});
1039 }
1040
1041
1042 sub collate_output
1043 {
1044   my $whc = Warehouse->new;
1045   Log (undef, "collate");
1046   $whc->write_start (1);
1047   my $joboutput;
1048   for (@jobstep)
1049   {
1050     next if (!exists $_->{'arvados_task'}->{output} ||
1051              !$_->{'arvados_task'}->{'success'} ||
1052              $_->{'exitcode'} != 0);
1053     my $output = $_->{'arvados_task'}->{output};
1054     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1055     {
1056       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1057       $whc->write_data ($output);
1058     }
1059     elsif (@jobstep == 1)
1060     {
1061       $joboutput = $output;
1062       $whc->write_finish;
1063     }
1064     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1065     {
1066       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1067       $whc->write_data ($outblock);
1068     }
1069     else
1070     {
1071       my $errstr = $whc->errstr;
1072       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1073       $main::success = 0;
1074     }
1075   }
1076   $joboutput = $whc->write_finish if !defined $joboutput;
1077   if ($joboutput)
1078   {
1079     Log (undef, "output $joboutput");
1080     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1081   }
1082   else
1083   {
1084     Log (undef, "output undef");
1085   }
1086   return $joboutput;
1087 }
1088
1089
1090 sub killem
1091 {
1092   foreach (@_)
1093   {
1094     my $sig = 2;                # SIGINT first
1095     if (exists $proc{$_}->{"sent_$sig"} &&
1096         time - $proc{$_}->{"sent_$sig"} > 4)
1097     {
1098       $sig = 15;                # SIGTERM if SIGINT doesn't work
1099     }
1100     if (exists $proc{$_}->{"sent_$sig"} &&
1101         time - $proc{$_}->{"sent_$sig"} > 4)
1102     {
1103       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1104     }
1105     if (!exists $proc{$_}->{"sent_$sig"})
1106     {
1107       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1108       kill $sig, $_;
1109       select (undef, undef, undef, 0.1);
1110       if ($sig == 2)
1111       {
1112         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1113       }
1114       $proc{$_}->{"sent_$sig"} = time;
1115       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1116     }
1117   }
1118 }
1119
1120
1121 sub fhbits
1122 {
1123   my($bits);
1124   for (@_) {
1125     vec($bits,fileno($_),1) = 1;
1126   }
1127   $bits;
1128 }
1129
1130
1131 sub Log                         # ($jobstep_id, $logmessage)
1132 {
1133   if ($_[1] =~ /\n/) {
1134     for my $line (split (/\n/, $_[1])) {
1135       Log ($_[0], $line);
1136     }
1137     return;
1138   }
1139   my $fh = select STDERR; $|=1; select $fh;
1140   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1141   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1142   $message .= "\n";
1143   my $datetime;
1144   if ($metastream || -t STDERR) {
1145     my @gmtime = gmtime;
1146     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1147                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1148   }
1149   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1150
1151   return if !$metastream;
1152   $metastream->write_data ($datetime . " " . $message);
1153 }
1154
1155
1156 sub croak
1157 {
1158   my ($package, $file, $line) = caller;
1159   my $message = "@_ at $file line $line\n";
1160   Log (undef, $message);
1161   freeze() if @jobstep_todo;
1162   collate_output() if @jobstep_todo;
1163   cleanup();
1164   save_meta() if $metastream;
1165   die;
1166 }
1167
1168
1169 sub cleanup
1170 {
1171   return if !$job_has_uuid;
1172   $Job->update_attributes('running' => 0,
1173                           'success' => 0,
1174                           'finished_at' => scalar gmtime);
1175 }
1176
1177
1178 sub save_meta
1179 {
1180   my $justcheckpoint = shift; # false if this will be the last meta saved
1181   my $m = $metastream;
1182   $m = $m->copy if $justcheckpoint;
1183   $m->write_finish;
1184   my $whc = Warehouse->new;
1185   my $loglocator = $whc->store_block ($m->as_string);
1186   $arv->{'collections'}->{'create'}->execute('collection' => {
1187     'uuid' => $loglocator,
1188     'manifest_text' => $m->as_string,
1189   });
1190   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1191   Log (undef, "log manifest is $loglocator");
1192   $Job->{'log'} = $loglocator;
1193   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1194 }
1195
1196
1197 sub freeze_if_want_freeze
1198 {
1199   if ($main::please_freeze)
1200   {
1201     release_allocation();
1202     if (@_)
1203     {
1204       # kill some srun procs before freeze+stop
1205       map { $proc{$_} = {} } @_;
1206       while (%proc)
1207       {
1208         killem (keys %proc);
1209         select (undef, undef, undef, 0.1);
1210         my $died;
1211         while (($died = waitpid (-1, WNOHANG)) > 0)
1212         {
1213           delete $proc{$died};
1214         }
1215       }
1216     }
1217     freeze();
1218     collate_output();
1219     cleanup();
1220     save_meta();
1221     exit 0;
1222   }
1223 }
1224
1225
1226 sub freeze
1227 {
1228   Log (undef, "Freeze not implemented");
1229   return;
1230 }
1231
1232
1233 sub thaw
1234 {
1235   croak ("Thaw not implemented");
1236
1237   my $whc;
1238   my $key = shift;
1239   Log (undef, "thaw from $key");
1240
1241   @jobstep = ();
1242   @jobstep_done = ();
1243   @jobstep_todo = ();
1244   @jobstep_tomerge = ();
1245   $jobstep_tomerge_level = 0;
1246   my $frozenjob = {};
1247
1248   my $stream = new Warehouse::Stream ( whc => $whc,
1249                                        hash => [split (",", $key)] );
1250   $stream->rewind;
1251   while (my $dataref = $stream->read_until (undef, "\n\n"))
1252   {
1253     if ($$dataref =~ /^job /)
1254     {
1255       foreach (split ("\n", $$dataref))
1256       {
1257         my ($k, $v) = split ("=", $_, 2);
1258         $frozenjob->{$k} = freezeunquote ($v);
1259       }
1260       next;
1261     }
1262
1263     if ($$dataref =~ /^merge (\d+) (.*)/)
1264     {
1265       $jobstep_tomerge_level = $1;
1266       @jobstep_tomerge
1267           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1268       next;
1269     }
1270
1271     my $Jobstep = { };
1272     foreach (split ("\n", $$dataref))
1273     {
1274       my ($k, $v) = split ("=", $_, 2);
1275       $Jobstep->{$k} = freezeunquote ($v) if $k;
1276     }
1277     $Jobstep->{'failures'} = 0;
1278     push @jobstep, $Jobstep;
1279
1280     if ($Jobstep->{exitcode} eq "0")
1281     {
1282       push @jobstep_done, $#jobstep;
1283     }
1284     else
1285     {
1286       push @jobstep_todo, $#jobstep;
1287     }
1288   }
1289
1290   foreach (qw (script script_version script_parameters))
1291   {
1292     $Job->{$_} = $frozenjob->{$_};
1293   }
1294   $Job->save if $job_has_uuid;
1295 }
1296
1297
1298 sub freezequote
1299 {
1300   my $s = shift;
1301   $s =~ s/\\/\\\\/g;
1302   $s =~ s/\n/\\n/g;
1303   return $s;
1304 }
1305
1306
1307 sub freezeunquote
1308 {
1309   my $s = shift;
1310   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1311   return $s;
1312 }
1313
1314
1315 sub srun
1316 {
1317   my $srunargs = shift;
1318   my $execargs = shift;
1319   my $opts = shift || {};
1320   my $stdin = shift;
1321   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1322   print STDERR (join (" ",
1323                       map { / / ? "'$_'" : $_ }
1324                       (@$args)),
1325                 "\n")
1326       if $ENV{CRUNCH_DEBUG};
1327
1328   if (defined $stdin) {
1329     my $child = open STDIN, "-|";
1330     defined $child or die "no fork: $!";
1331     if ($child == 0) {
1332       print $stdin or die $!;
1333       close STDOUT or die $!;
1334       exit 0;
1335     }
1336   }
1337
1338   return system (@$args) if $opts->{fork};
1339
1340   exec @$args;
1341   warn "ENV size is ".length(join(" ",%ENV));
1342   die "exec failed: $!: @$args";
1343 }
1344
1345
1346 sub ban_node_by_slot {
1347   # Don't start any new jobsteps on this node for 60 seconds
1348   my $slotid = shift;
1349   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1350   $slot[$slotid]->{node}->{hold_count}++;
1351   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1352 }
1353
1354 __DATA__
1355 #!/usr/bin/perl
1356
1357 # checkout-and-build
1358
1359 use Fcntl ':flock';
1360
1361 my $destdir = $ENV{"CRUNCH_SRC"};
1362 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1363 my $repo = $ENV{"CRUNCH_SRC_URL"};
1364
1365 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1366 flock L, LOCK_EX;
1367 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1368     exit 0;
1369 }
1370
1371 unlink "$destdir.commit";
1372 open STDOUT, ">", "$destdir.log";
1373 open STDERR, ">&STDOUT";
1374
1375 mkdir $destdir;
1376 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1377 print TARX <DATA>;
1378 if(!close(TARX)) {
1379   die "'tar -C $destdir -xf -' exited $?: $!";
1380 }
1381
1382 my $pwd;
1383 chomp ($pwd = `pwd`);
1384 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1385 mkdir $install_dir;
1386
1387 shell_or_die ("virtualenv", $install_dir);
1388 for my $src_path ("$destdir/arvados/sdk/python", "$destdir/sdk/python") {
1389   if (-d $src_path) {
1390     shell_or_die ("cd $src_path && $install_dir/bin/python setup.py install");
1391   }
1392 }
1393
1394 if (-e "$destdir/crunch_scripts/install") {
1395     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1396 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1397     # Old version
1398     shell_or_die ("./tests/autotests.sh", $install_dir);
1399 } elsif (-e "./install.sh") {
1400     shell_or_die ("./install.sh", $install_dir);
1401 }
1402
1403 if ($commit) {
1404     unlink "$destdir.commit.new";
1405     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1406     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1407 }
1408
1409 close L;
1410
1411 exit 0;
1412
1413 sub shell_or_die
1414 {
1415   if ($ENV{"DEBUG"}) {
1416     print STDERR "@_\n";
1417   }
1418   system (@_) == 0
1419       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1420 }
1421
1422 __DATA__