Add comments to explain different ways we choose $repo.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403
404   # If we're running under crunch-dispatch, it will have pulled the
405   # appropriate source tree into its own repository, and given us that
406   # repo's path as $git_dir. If we're running a "local" job, and a
407   # script_version was specified, it's up to the user to provide the
408   # full path to a local repository in Job->{repository}.
409   #
410   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
411   # git-archive --remote where appropriate.
412   #
413   # TODO: Accept a locally-hosted Arvados repository by name or
414   # UUID. Use arvados.v1.repositories.list or .get to figure out the
415   # appropriate fetch-url.
416   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
417
418   $ENV{"CRUNCH_SRC_URL"} = $repo;
419
420   if (-d "$repo/.git") {
421     # We were given a working directory, but we are only interested in
422     # the index.
423     $repo = "$repo/.git";
424   }
425
426   # If this looks like a subversion r#, look for it in git-svn commit messages
427
428   if ($treeish =~ m{^\d{1,4}$}) {
429     my $gitlog = `git --git-dir="$repo" log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " master`;
430     chomp $gitlog;
431     if ($gitlog =~ /^[a-f0-9]{40}$/) {
432       $commit = $gitlog;
433       Log (undef, "Using commit $commit for script_version $treeish");
434     }
435   }
436
437   # If that didn't work, try asking git to look it up as a tree-ish.
438
439   if (!defined $commit) {
440     my $found = `git --git-dir="$repo" rev-list -1 "$treeish"`;
441     chomp $found;
442     if ($found =~ /^[0-9a-f]{40}$/s) {
443       $commit = $found;
444       if ($commit ne $treeish) {
445         # Make sure we record the real commit id in the database,
446         # frozentokey, logs, etc. -- instead of an abbreviation or a
447         # branch name which can become ambiguous or point to a
448         # different commit in the future.
449         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
450         Log (undef, "Using commit $commit for tree-ish $treeish");
451         if ($commit ne $treeish) {
452           $Job->{'script_version'} = $commit;
453           !$job_has_uuid or
454               $Job->update_attributes('script_version' => $commit) or
455               croak("Error while updating job");
456         }
457       }
458     }
459   }
460
461   if (defined $commit) {
462     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
463     @execargs = ("sh", "-c",
464                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
465     $git_archive = `git --git-dir="$repo" archive "$commit"`;
466   }
467   else {
468     croak ("could not figure out commit id for $treeish");
469   }
470
471   my $installpid = fork();
472   if ($installpid == 0)
473   {
474     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
475     exit (1);
476   }
477   while (1)
478   {
479     last if $installpid == waitpid (-1, WNOHANG);
480     freeze_if_want_freeze ($installpid);
481     select (undef, undef, undef, 0.1);
482   }
483   Log (undef, "Install exited $?");
484 }
485
486 if (!$have_slurm)
487 {
488   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
489   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
490 }
491
492
493
494 foreach (qw (script script_version script_parameters runtime_constraints))
495 {
496   Log (undef,
497        "$_ " .
498        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
499 }
500 foreach (split (/\n/, $Job->{knobs}))
501 {
502   Log (undef, "knob " . $_);
503 }
504
505
506
507 $main::success = undef;
508
509
510
511 ONELEVEL:
512
513 my $thisround_succeeded = 0;
514 my $thisround_failed = 0;
515 my $thisround_failed_multiple = 0;
516
517 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
518                        or $a <=> $b } @jobstep_todo;
519 my $level = $jobstep[$jobstep_todo[0]]->{level};
520 Log (undef, "start level $level");
521
522
523
524 my %proc;
525 my @freeslot = (0..$#slot);
526 my @holdslot;
527 my %reader;
528 my $progress_is_dirty = 1;
529 my $progress_stats_updated = 0;
530
531 update_progress_stats();
532
533
534
535 THISROUND:
536 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
537 {
538   my $id = $jobstep_todo[$todo_ptr];
539   my $Jobstep = $jobstep[$id];
540   if ($Jobstep->{level} != $level)
541   {
542     next;
543   }
544
545   pipe $reader{$id}, "writer" or croak ($!);
546   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
547   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
548
549   my $childslot = $freeslot[0];
550   my $childnode = $slot[$childslot]->{node};
551   my $childslotname = join (".",
552                             $slot[$childslot]->{node}->{name},
553                             $slot[$childslot]->{cpu});
554   my $childpid = fork();
555   if ($childpid == 0)
556   {
557     $SIG{'INT'} = 'DEFAULT';
558     $SIG{'QUIT'} = 'DEFAULT';
559     $SIG{'TERM'} = 'DEFAULT';
560
561     foreach (values (%reader))
562     {
563       close($_);
564     }
565     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
566     open(STDOUT,">&writer");
567     open(STDERR,">&writer");
568
569     undef $dbh;
570     undef $sth;
571
572     delete $ENV{"GNUPGHOME"};
573     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
574     $ENV{"TASK_QSEQUENCE"} = $id;
575     $ENV{"TASK_SEQUENCE"} = $level;
576     $ENV{"JOB_SCRIPT"} = $Job->{script};
577     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
578       $param =~ tr/a-z/A-Z/;
579       $ENV{"JOB_PARAMETER_$param"} = $value;
580     }
581     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
582     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
583     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
584     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
585     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
586     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
587     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
588
589     $ENV{"GZIP"} = "-n";
590
591     my @srunargs = (
592       "srun",
593       "--nodelist=".$childnode->{name},
594       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
595       "--job-name=$job_id.$id.$$",
596         );
597     my @execargs = qw(sh);
598     my $build_script_to_send = "";
599     my $command =
600         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
601         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
602         ."&& cd $ENV{CRUNCH_TMP} ";
603     if ($build_script)
604     {
605       $build_script_to_send = $build_script;
606       $command .=
607           "&& perl -";
608     }
609     $command .=
610         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
611     my @execargs = ('bash', '-c', $command);
612     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
613     exit (111);
614   }
615   close("writer");
616   if (!defined $childpid)
617   {
618     close $reader{$id};
619     delete $reader{$id};
620     next;
621   }
622   shift @freeslot;
623   $proc{$childpid} = { jobstep => $id,
624                        time => time,
625                        slot => $childslot,
626                        jobstepname => "$job_id.$id.$childpid",
627                      };
628   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
629   $slot[$childslot]->{pid} = $childpid;
630
631   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
632   Log ($id, "child $childpid started on $childslotname");
633   $Jobstep->{starttime} = time;
634   $Jobstep->{node} = $childnode->{name};
635   $Jobstep->{slotindex} = $childslot;
636   delete $Jobstep->{stderr};
637   delete $Jobstep->{finishtime};
638
639   splice @jobstep_todo, $todo_ptr, 1;
640   --$todo_ptr;
641
642   $progress_is_dirty = 1;
643
644   while (!@freeslot
645          ||
646          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
647   {
648     last THISROUND if $main::please_freeze;
649     if ($main::please_info)
650     {
651       $main::please_info = 0;
652       freeze();
653       collate_output();
654       save_meta(1);
655       update_progress_stats();
656     }
657     my $gotsome
658         = readfrompipes ()
659         + reapchildren ();
660     if (!$gotsome)
661     {
662       check_refresh_wanted();
663       check_squeue();
664       update_progress_stats();
665       select (undef, undef, undef, 0.1);
666     }
667     elsif (time - $progress_stats_updated >= 30)
668     {
669       update_progress_stats();
670     }
671     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
672         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
673     {
674       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
675           .($thisround_failed+$thisround_succeeded)
676           .") -- giving up on this round";
677       Log (undef, $message);
678       last THISROUND;
679     }
680
681     # move slots from freeslot to holdslot (or back to freeslot) if necessary
682     for (my $i=$#freeslot; $i>=0; $i--) {
683       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
684         push @holdslot, (splice @freeslot, $i, 1);
685       }
686     }
687     for (my $i=$#holdslot; $i>=0; $i--) {
688       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
689         push @freeslot, (splice @holdslot, $i, 1);
690       }
691     }
692
693     # give up if no nodes are succeeding
694     if (!grep { $_->{node}->{losing_streak} == 0 &&
695                     $_->{node}->{hold_count} < 4 } @slot) {
696       my $message = "Every node has failed -- giving up on this round";
697       Log (undef, $message);
698       last THISROUND;
699     }
700   }
701 }
702
703
704 push @freeslot, splice @holdslot;
705 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
706
707
708 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
709 while (%proc)
710 {
711   if ($main::please_continue) {
712     $main::please_continue = 0;
713     goto THISROUND;
714   }
715   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
716   readfrompipes ();
717   if (!reapchildren())
718   {
719     check_refresh_wanted();
720     check_squeue();
721     update_progress_stats();
722     select (undef, undef, undef, 0.1);
723     killem (keys %proc) if $main::please_freeze;
724   }
725 }
726
727 update_progress_stats();
728 freeze_if_want_freeze();
729
730
731 if (!defined $main::success)
732 {
733   if (@jobstep_todo &&
734       $thisround_succeeded == 0 &&
735       ($thisround_failed == 0 || $thisround_failed > 4))
736   {
737     my $message = "stop because $thisround_failed tasks failed and none succeeded";
738     Log (undef, $message);
739     $main::success = 0;
740   }
741   if (!@jobstep_todo)
742   {
743     $main::success = 1;
744   }
745 }
746
747 goto ONELEVEL if !defined $main::success;
748
749
750 release_allocation();
751 freeze();
752 if ($job_has_uuid) {
753   $Job->update_attributes('output' => &collate_output(),
754                           'running' => 0,
755                           'success' => $Job->{'output'} && $main::success,
756                           'finished_at' => scalar gmtime)
757 }
758
759 if ($Job->{'output'})
760 {
761   eval {
762     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
763     $arv->{'collections'}->{'create'}->execute('collection' => {
764       'uuid' => $Job->{'output'},
765       'manifest_text' => $manifest_text,
766     });
767     if ($Job->{'output_is_persistent'}) {
768       $arv->{'links'}->{'create'}->execute('link' => {
769         'tail_kind' => 'arvados#user',
770         'tail_uuid' => $User->{'uuid'},
771         'head_kind' => 'arvados#collection',
772         'head_uuid' => $Job->{'output'},
773         'link_class' => 'resources',
774         'name' => 'wants',
775       });
776     }
777   };
778   if ($@) {
779     Log (undef, "Failed to register output manifest: $@");
780   }
781 }
782
783 Log (undef, "finish");
784
785 save_meta();
786 exit 0;
787
788
789
790 sub update_progress_stats
791 {
792   $progress_stats_updated = time;
793   return if !$progress_is_dirty;
794   my ($todo, $done, $running) = (scalar @jobstep_todo,
795                                  scalar @jobstep_done,
796                                  scalar @slot - scalar @freeslot - scalar @holdslot);
797   $Job->{'tasks_summary'} ||= {};
798   $Job->{'tasks_summary'}->{'todo'} = $todo;
799   $Job->{'tasks_summary'}->{'done'} = $done;
800   $Job->{'tasks_summary'}->{'running'} = $running;
801   if ($job_has_uuid) {
802     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
803   }
804   Log (undef, "status: $done done, $running running, $todo todo");
805   $progress_is_dirty = 0;
806 }
807
808
809
810 sub reapchildren
811 {
812   my $pid = waitpid (-1, WNOHANG);
813   return 0 if $pid <= 0;
814
815   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
816                   . "."
817                   . $slot[$proc{$pid}->{slot}]->{cpu});
818   my $jobstepid = $proc{$pid}->{jobstep};
819   my $elapsed = time - $proc{$pid}->{time};
820   my $Jobstep = $jobstep[$jobstepid];
821
822   my $childstatus = $?;
823   my $exitvalue = $childstatus >> 8;
824   my $exitinfo = sprintf("exit %d signal %d%s",
825                          $exitvalue,
826                          $childstatus & 127,
827                          ($childstatus & 128 ? ' core dump' : ''));
828   $Jobstep->{'arvados_task'}->reload;
829   my $task_success = $Jobstep->{'arvados_task'}->{success};
830
831   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
832
833   if (!defined $task_success) {
834     # task did not indicate one way or the other --> fail
835     $Jobstep->{'arvados_task'}->{success} = 0;
836     $Jobstep->{'arvados_task'}->save;
837     $task_success = 0;
838   }
839
840   if (!$task_success)
841   {
842     my $temporary_fail;
843     $temporary_fail ||= $Jobstep->{node_fail};
844     $temporary_fail ||= ($exitvalue == 111);
845
846     ++$thisround_failed;
847     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
848
849     # Check for signs of a failed or misconfigured node
850     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
851         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
852       # Don't count this against jobstep failure thresholds if this
853       # node is already suspected faulty and srun exited quickly
854       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
855           $elapsed < 5) {
856         Log ($jobstepid, "blaming failure on suspect node " .
857              $slot[$proc{$pid}->{slot}]->{node}->{name});
858         $temporary_fail ||= 1;
859       }
860       ban_node_by_slot($proc{$pid}->{slot});
861     }
862
863     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
864                              ++$Jobstep->{'failures'},
865                              $temporary_fail ? 'temporary ' : 'permanent',
866                              $elapsed));
867
868     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
869       # Give up on this task, and the whole job
870       $main::success = 0;
871       $main::please_freeze = 1;
872     }
873     else {
874       # Put this task back on the todo queue
875       push @jobstep_todo, $jobstepid;
876     }
877     $Job->{'tasks_summary'}->{'failed'}++;
878   }
879   else
880   {
881     ++$thisround_succeeded;
882     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
883     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
884     push @jobstep_done, $jobstepid;
885     Log ($jobstepid, "success in $elapsed seconds");
886   }
887   $Jobstep->{exitcode} = $childstatus;
888   $Jobstep->{finishtime} = time;
889   process_stderr ($jobstepid, $task_success);
890   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
891
892   close $reader{$jobstepid};
893   delete $reader{$jobstepid};
894   delete $slot[$proc{$pid}->{slot}]->{pid};
895   push @freeslot, $proc{$pid}->{slot};
896   delete $proc{$pid};
897
898   # Load new tasks
899   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
900     'where' => {
901       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
902     },
903     'order' => 'qsequence'
904   );
905   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
906     my $jobstep = {
907       'level' => $arvados_task->{'sequence'},
908       'failures' => 0,
909       'arvados_task' => $arvados_task
910     };
911     push @jobstep, $jobstep;
912     push @jobstep_todo, $#jobstep;
913   }
914
915   $progress_is_dirty = 1;
916   1;
917 }
918
919 sub check_refresh_wanted
920 {
921   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
922   if (@stat && $stat[9] > $latest_refresh) {
923     $latest_refresh = scalar time;
924     if ($job_has_uuid) {
925       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
926       for my $attr ('cancelled_at',
927                     'cancelled_by_user_uuid',
928                     'cancelled_by_client_uuid') {
929         $Job->{$attr} = $Job2->{$attr};
930       }
931       if ($Job->{'cancelled_at'}) {
932         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
933              " by user " . $Job->{cancelled_by_user_uuid});
934         $main::success = 0;
935         $main::please_freeze = 1;
936       }
937     }
938   }
939 }
940
941 sub check_squeue
942 {
943   # return if the kill list was checked <4 seconds ago
944   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
945   {
946     return;
947   }
948   $squeue_kill_checked = time;
949
950   # use killem() on procs whose killtime is reached
951   for (keys %proc)
952   {
953     if (exists $proc{$_}->{killtime}
954         && $proc{$_}->{killtime} <= time)
955     {
956       killem ($_);
957     }
958   }
959
960   # return if the squeue was checked <60 seconds ago
961   if (defined $squeue_checked && $squeue_checked > time - 60)
962   {
963     return;
964   }
965   $squeue_checked = time;
966
967   if (!$have_slurm)
968   {
969     # here is an opportunity to check for mysterious problems with local procs
970     return;
971   }
972
973   # get a list of steps still running
974   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
975   chop @squeue;
976   if ($squeue[-1] ne "ok")
977   {
978     return;
979   }
980   pop @squeue;
981
982   # which of my jobsteps are running, according to squeue?
983   my %ok;
984   foreach (@squeue)
985   {
986     if (/^(\d+)\.(\d+) (\S+)/)
987     {
988       if ($1 eq $ENV{SLURM_JOBID})
989       {
990         $ok{$3} = 1;
991       }
992     }
993   }
994
995   # which of my active child procs (>60s old) were not mentioned by squeue?
996   foreach (keys %proc)
997   {
998     if ($proc{$_}->{time} < time - 60
999         && !exists $ok{$proc{$_}->{jobstepname}}
1000         && !exists $proc{$_}->{killtime})
1001     {
1002       # kill this proc if it hasn't exited in 30 seconds
1003       $proc{$_}->{killtime} = time + 30;
1004     }
1005   }
1006 }
1007
1008
1009 sub release_allocation
1010 {
1011   if ($have_slurm)
1012   {
1013     Log (undef, "release job allocation");
1014     system "scancel $ENV{SLURM_JOBID}";
1015   }
1016 }
1017
1018
1019 sub readfrompipes
1020 {
1021   my $gotsome = 0;
1022   foreach my $job (keys %reader)
1023   {
1024     my $buf;
1025     while (0 < sysread ($reader{$job}, $buf, 8192))
1026     {
1027       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1028       $jobstep[$job]->{stderr} .= $buf;
1029       preprocess_stderr ($job);
1030       if (length ($jobstep[$job]->{stderr}) > 16384)
1031       {
1032         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1033       }
1034       $gotsome = 1;
1035     }
1036   }
1037   return $gotsome;
1038 }
1039
1040
1041 sub preprocess_stderr
1042 {
1043   my $job = shift;
1044
1045   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1046     my $line = $1;
1047     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1048     Log ($job, "stderr $line");
1049     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1050       # whoa.
1051       $main::please_freeze = 1;
1052     }
1053     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1054       $jobstep[$job]->{node_fail} = 1;
1055       ban_node_by_slot($jobstep[$job]->{slotindex});
1056     }
1057   }
1058 }
1059
1060
1061 sub process_stderr
1062 {
1063   my $job = shift;
1064   my $task_success = shift;
1065   preprocess_stderr ($job);
1066
1067   map {
1068     Log ($job, "stderr $_");
1069   } split ("\n", $jobstep[$job]->{stderr});
1070 }
1071
1072 sub fetch_block
1073 {
1074   my $hash = shift;
1075   my ($keep, $child_out, $output_block);
1076
1077   my $cmd = "arv keep get \Q$hash\E";
1078   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1079   sysread($keep, $output_block, 64 * 1024 * 1024);
1080   close $keep;
1081   return $output_block;
1082 }
1083
1084 sub collate_output
1085 {
1086   Log (undef, "collate");
1087
1088   my ($child_out, $child_in);
1089   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1090   my $joboutput;
1091   for (@jobstep)
1092   {
1093     next if (!exists $_->{'arvados_task'}->{output} ||
1094              !$_->{'arvados_task'}->{'success'} ||
1095              $_->{'exitcode'} != 0);
1096     my $output = $_->{'arvados_task'}->{output};
1097     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1098     {
1099       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1100       print $child_in $output;
1101     }
1102     elsif (@jobstep == 1)
1103     {
1104       $joboutput = $output;
1105       last;
1106     }
1107     elsif (defined (my $outblock = fetch_block ($output)))
1108     {
1109       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1110       print $child_in $outblock;
1111     }
1112     else
1113     {
1114       Log (undef, "XXX fetch_block($output) failed XXX");
1115       $main::success = 0;
1116     }
1117   }
1118   $child_in->close;
1119
1120   if (!defined $joboutput) {
1121     my $s = IO::Select->new($child_out);
1122     if ($s->can_read(120)) {
1123       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1124       chomp($joboutput);
1125     } else {
1126       Log (undef, "timed out reading from 'arv keep put'");
1127     }
1128   }
1129   waitpid($pid, 0);
1130
1131   if ($joboutput)
1132   {
1133     Log (undef, "output $joboutput");
1134     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1135   }
1136   else
1137   {
1138     Log (undef, "output undef");
1139   }
1140   return $joboutput;
1141 }
1142
1143
1144 sub killem
1145 {
1146   foreach (@_)
1147   {
1148     my $sig = 2;                # SIGINT first
1149     if (exists $proc{$_}->{"sent_$sig"} &&
1150         time - $proc{$_}->{"sent_$sig"} > 4)
1151     {
1152       $sig = 15;                # SIGTERM if SIGINT doesn't work
1153     }
1154     if (exists $proc{$_}->{"sent_$sig"} &&
1155         time - $proc{$_}->{"sent_$sig"} > 4)
1156     {
1157       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1158     }
1159     if (!exists $proc{$_}->{"sent_$sig"})
1160     {
1161       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1162       kill $sig, $_;
1163       select (undef, undef, undef, 0.1);
1164       if ($sig == 2)
1165       {
1166         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1167       }
1168       $proc{$_}->{"sent_$sig"} = time;
1169       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1170     }
1171   }
1172 }
1173
1174
1175 sub fhbits
1176 {
1177   my($bits);
1178   for (@_) {
1179     vec($bits,fileno($_),1) = 1;
1180   }
1181   $bits;
1182 }
1183
1184
1185 sub Log                         # ($jobstep_id, $logmessage)
1186 {
1187   if ($_[1] =~ /\n/) {
1188     for my $line (split (/\n/, $_[1])) {
1189       Log ($_[0], $line);
1190     }
1191     return;
1192   }
1193   my $fh = select STDERR; $|=1; select $fh;
1194   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1195   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1196   $message .= "\n";
1197   my $datetime;
1198   if ($metastream || -t STDERR) {
1199     my @gmtime = gmtime;
1200     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1201                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1202   }
1203   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1204
1205   if ($metastream) {
1206     print $metastream $datetime . " " . $message;
1207   }
1208 }
1209
1210
1211 sub croak
1212 {
1213   my ($package, $file, $line) = caller;
1214   my $message = "@_ at $file line $line\n";
1215   Log (undef, $message);
1216   freeze() if @jobstep_todo;
1217   collate_output() if @jobstep_todo;
1218   cleanup();
1219   save_meta() if $metastream;
1220   die;
1221 }
1222
1223
1224 sub cleanup
1225 {
1226   return if !$job_has_uuid;
1227   $Job->update_attributes('running' => 0,
1228                           'success' => 0,
1229                           'finished_at' => scalar gmtime);
1230 }
1231
1232
1233 sub save_meta
1234 {
1235   my $justcheckpoint = shift; # false if this will be the last meta saved
1236   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1237
1238   $local_logfile->flush;
1239   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1240       . quotemeta($local_logfile->filename);
1241   my $loglocator = `$cmd`;
1242   die "system $cmd failed: $?" if $?;
1243
1244   $local_logfile = undef;   # the temp file is automatically deleted
1245   Log (undef, "log manifest is $loglocator");
1246   $Job->{'log'} = $loglocator;
1247   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1248 }
1249
1250
1251 sub freeze_if_want_freeze
1252 {
1253   if ($main::please_freeze)
1254   {
1255     release_allocation();
1256     if (@_)
1257     {
1258       # kill some srun procs before freeze+stop
1259       map { $proc{$_} = {} } @_;
1260       while (%proc)
1261       {
1262         killem (keys %proc);
1263         select (undef, undef, undef, 0.1);
1264         my $died;
1265         while (($died = waitpid (-1, WNOHANG)) > 0)
1266         {
1267           delete $proc{$died};
1268         }
1269       }
1270     }
1271     freeze();
1272     collate_output();
1273     cleanup();
1274     save_meta();
1275     exit 0;
1276   }
1277 }
1278
1279
1280 sub freeze
1281 {
1282   Log (undef, "Freeze not implemented");
1283   return;
1284 }
1285
1286
1287 sub thaw
1288 {
1289   croak ("Thaw not implemented");
1290 }
1291
1292
1293 sub freezequote
1294 {
1295   my $s = shift;
1296   $s =~ s/\\/\\\\/g;
1297   $s =~ s/\n/\\n/g;
1298   return $s;
1299 }
1300
1301
1302 sub freezeunquote
1303 {
1304   my $s = shift;
1305   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1306   return $s;
1307 }
1308
1309
1310 sub srun
1311 {
1312   my $srunargs = shift;
1313   my $execargs = shift;
1314   my $opts = shift || {};
1315   my $stdin = shift;
1316   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1317   print STDERR (join (" ",
1318                       map { / / ? "'$_'" : $_ }
1319                       (@$args)),
1320                 "\n")
1321       if $ENV{CRUNCH_DEBUG};
1322
1323   if (defined $stdin) {
1324     my $child = open STDIN, "-|";
1325     defined $child or die "no fork: $!";
1326     if ($child == 0) {
1327       print $stdin or die $!;
1328       close STDOUT or die $!;
1329       exit 0;
1330     }
1331   }
1332
1333   return system (@$args) if $opts->{fork};
1334
1335   exec @$args;
1336   warn "ENV size is ".length(join(" ",%ENV));
1337   die "exec failed: $!: @$args";
1338 }
1339
1340
1341 sub ban_node_by_slot {
1342   # Don't start any new jobsteps on this node for 60 seconds
1343   my $slotid = shift;
1344   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1345   $slot[$slotid]->{node}->{hold_count}++;
1346   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1347 }
1348
1349 sub must_lock_now
1350 {
1351   my ($lockfile, $error_message) = @_;
1352   open L, ">", $lockfile or croak("$lockfile: $!");
1353   if (!flock L, LOCK_EX|LOCK_NB) {
1354     croak("Can't lock $lockfile: $error_message\n");
1355   }
1356 }
1357
1358 __DATA__
1359 #!/usr/bin/perl
1360
1361 # checkout-and-build
1362
1363 use Fcntl ':flock';
1364
1365 my $destdir = $ENV{"CRUNCH_SRC"};
1366 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1367 my $repo = $ENV{"CRUNCH_SRC_URL"};
1368
1369 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1370 flock L, LOCK_EX;
1371 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1372     exit 0;
1373 }
1374
1375 unlink "$destdir.commit";
1376 open STDOUT, ">", "$destdir.log";
1377 open STDERR, ">&STDOUT";
1378
1379 mkdir $destdir;
1380 my @git_archive_data = <DATA>;
1381 if (@git_archive_data) {
1382   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1383   print TARX @git_archive_data;
1384   if(!close(TARX)) {
1385     die "'tar -C $destdir -xf -' exited $?: $!";
1386   }
1387 }
1388
1389 my $pwd;
1390 chomp ($pwd = `pwd`);
1391 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1392 mkdir $install_dir;
1393
1394 for my $src_path ("$destdir/arvados/sdk/python") {
1395   if (-d $src_path) {
1396     shell_or_die ("virtualenv", $install_dir);
1397     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1398   }
1399 }
1400
1401 if (-e "$destdir/crunch_scripts/install") {
1402     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1403 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1404     # Old version
1405     shell_or_die ("./tests/autotests.sh", $install_dir);
1406 } elsif (-e "./install.sh") {
1407     shell_or_die ("./install.sh", $install_dir);
1408 }
1409
1410 if ($commit) {
1411     unlink "$destdir.commit.new";
1412     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1413     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1414 }
1415
1416 close L;
1417
1418 exit 0;
1419
1420 sub shell_or_die
1421 {
1422   if ($ENV{"DEBUG"}) {
1423     print STDERR "@_\n";
1424   }
1425   system (@_) == 0
1426       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1427 }
1428
1429 __DATA__