Merge branch 'master' of git.clinicalfuture.com:arvados
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use Warehouse;
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 $metastream = Warehouse::Stream->new(whc => new Warehouse);
179 $metastream->clear;
180 $metastream->name('.');
181 $metastream->write_start($job_id . '.log.txt');
182
183
184 $Job->{'runtime_constraints'} ||= {};
185 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
186 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
187
188
189 Log (undef, "check slurm allocation");
190 my @slot;
191 my @node;
192 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
193 my @sinfo;
194 if (!$have_slurm)
195 {
196   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
197   push @sinfo, "$localcpus localhost";
198 }
199 if (exists $ENV{SLURM_NODELIST})
200 {
201   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
202 }
203 foreach (@sinfo)
204 {
205   my ($ncpus, $slurm_nodelist) = split;
206   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
207
208   my @nodelist;
209   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
210   {
211     my $nodelist = $1;
212     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
213     {
214       my $ranges = $1;
215       foreach (split (",", $ranges))
216       {
217         my ($a, $b);
218         if (/(\d+)-(\d+)/)
219         {
220           $a = $1;
221           $b = $2;
222         }
223         else
224         {
225           $a = $_;
226           $b = $_;
227         }
228         push @nodelist, map {
229           my $n = $nodelist;
230           $n =~ s/\[[-,\d]+\]/$_/;
231           $n;
232         } ($a..$b);
233       }
234     }
235     else
236     {
237       push @nodelist, $nodelist;
238     }
239   }
240   foreach my $nodename (@nodelist)
241   {
242     Log (undef, "node $nodename - $ncpus slots");
243     my $node = { name => $nodename,
244                  ncpus => $ncpus,
245                  losing_streak => 0,
246                  hold_until => 0 };
247     foreach my $cpu (1..$ncpus)
248     {
249       push @slot, { node => $node,
250                     cpu => $cpu };
251     }
252   }
253   push @node, @nodelist;
254 }
255
256
257
258 # Ensure that we get one jobstep running on each allocated node before
259 # we start overloading nodes with concurrent steps
260
261 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
262
263
264
265 my $jobmanager_id;
266 if ($job_has_uuid)
267 {
268   # Claim this job, and make sure nobody else does
269   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
270           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271     croak("Error while updating / locking job");
272   }
273   $Job->update_attributes('started_at' => scalar gmtime,
274                           'running' => 1,
275                           'success' => undef,
276                           'tasks_summary' => { 'failed' => 0,
277                                                'todo' => 1,
278                                                'running' => 0,
279                                                'done' => 0 });
280 }
281
282
283 Log (undef, "start");
284 $SIG{'INT'} = sub { $main::please_freeze = 1; };
285 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
286 $SIG{'TERM'} = \&croak;
287 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
288 $SIG{'ALRM'} = sub { $main::please_info = 1; };
289 $SIG{'CONT'} = sub { $main::please_continue = 1; };
290 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291
292 $main::please_freeze = 0;
293 $main::please_info = 0;
294 $main::please_continue = 0;
295 $main::please_refresh = 0;
296 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
297
298 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
299 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
300 $ENV{"JOB_UUID"} = $job_id;
301
302
303 my @jobstep;
304 my @jobstep_todo = ();
305 my @jobstep_done = ();
306 my @jobstep_tomerge = ();
307 my $jobstep_tomerge_level = 0;
308 my $squeue_checked;
309 my $squeue_kill_checked;
310 my $output_in_keep = 0;
311 my $latest_refresh = scalar time;
312
313
314
315 if (defined $Job->{thawedfromkey})
316 {
317   thaw ($Job->{thawedfromkey});
318 }
319 else
320 {
321   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
322     'job_uuid' => $Job->{'uuid'},
323     'sequence' => 0,
324     'qsequence' => 0,
325     'parameters' => {},
326                                                           });
327   push @jobstep, { 'level' => 0,
328                    'failures' => 0,
329                    'arvados_task' => $first_task,
330                  };
331   push @jobstep_todo, 0;
332 }
333
334
335 if (!$have_slurm)
336 {
337   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
338 }
339
340
341 my $build_script;
342
343
344 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
345
346 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
347 if ($skip_install)
348 {
349   if (!defined $no_clear_tmp) {
350     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
351     system($clear_tmp_cmd) == 0
352         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
353   }
354   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
355   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
356     if (-d $src_path) {
357       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
358           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
359       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
360           == 0
361           or croak ("setup.py in $src_path failed: exit ".($?>>8));
362     }
363   }
364 }
365 else
366 {
367   do {
368     local $/ = undef;
369     $build_script = <DATA>;
370   };
371   Log (undef, "Install revision ".$Job->{script_version});
372   my $nodelist = join(",", @node);
373
374   if (!defined $no_clear_tmp) {
375     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
376
377     my $cleanpid = fork();
378     if ($cleanpid == 0)
379     {
380       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
381             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
382       exit (1);
383     }
384     while (1)
385     {
386       last if $cleanpid == waitpid (-1, WNOHANG);
387       freeze_if_want_freeze ($cleanpid);
388       select (undef, undef, undef, 0.1);
389     }
390     Log (undef, "Clean-work-dir exited $?");
391   }
392
393   # Install requested code version
394
395   my @execargs;
396   my @srunargs = ("srun",
397                   "--nodelist=$nodelist",
398                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
399
400   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
401   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
402
403   my $commit;
404   my $git_archive;
405   my $treeish = $Job->{'script_version'};
406   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
407   # Todo: let script_version specify repository instead of expecting
408   # parent process to figure it out.
409   $ENV{"CRUNCH_SRC_URL"} = $repo;
410
411   # Create/update our clone of the remote git repo
412
413   if (!-d $ENV{"CRUNCH_SRC"}) {
414     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
415         or croak ("git clone $repo failed: exit ".($?>>8));
416     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
417   }
418   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
419
420   # If this looks like a subversion r#, look for it in git-svn commit messages
421
422   if ($treeish =~ m{^\d{1,4}$}) {
423     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
424     chomp $gitlog;
425     if ($gitlog =~ /^[a-f0-9]{40}$/) {
426       $commit = $gitlog;
427       Log (undef, "Using commit $commit for script_version $treeish");
428     }
429   }
430
431   # If that didn't work, try asking git to look it up as a tree-ish.
432
433   if (!defined $commit) {
434
435     my $cooked_treeish = $treeish;
436     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
437       # Looks like a git branch name -- make sure git knows it's
438       # relative to the remote repo
439       $cooked_treeish = "origin/$treeish";
440     }
441
442     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
443     chomp $found;
444     if ($found =~ /^[0-9a-f]{40}$/s) {
445       $commit = $found;
446       if ($commit ne $treeish) {
447         # Make sure we record the real commit id in the database,
448         # frozentokey, logs, etc. -- instead of an abbreviation or a
449         # branch name which can become ambiguous or point to a
450         # different commit in the future.
451         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
452         Log (undef, "Using commit $commit for tree-ish $treeish");
453         if ($commit ne $treeish) {
454           $Job->{'script_version'} = $commit;
455           !$job_has_uuid or
456               $Job->update_attributes('script_version' => $commit) or
457               croak("Error while updating job");
458         }
459       }
460     }
461   }
462
463   if (defined $commit) {
464     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
465     @execargs = ("sh", "-c",
466                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
467     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
468   }
469   else {
470     croak ("could not figure out commit id for $treeish");
471   }
472
473   my $installpid = fork();
474   if ($installpid == 0)
475   {
476     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
477     exit (1);
478   }
479   while (1)
480   {
481     last if $installpid == waitpid (-1, WNOHANG);
482     freeze_if_want_freeze ($installpid);
483     select (undef, undef, undef, 0.1);
484   }
485   Log (undef, "Install exited $?");
486 }
487
488 if (!$have_slurm)
489 {
490   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
491   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
492 }
493
494
495
496 foreach (qw (script script_version script_parameters runtime_constraints))
497 {
498   Log (undef,
499        "$_ " .
500        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
501 }
502 foreach (split (/\n/, $Job->{knobs}))
503 {
504   Log (undef, "knob " . $_);
505 }
506
507
508
509 $main::success = undef;
510
511
512
513 ONELEVEL:
514
515 my $thisround_succeeded = 0;
516 my $thisround_failed = 0;
517 my $thisround_failed_multiple = 0;
518
519 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
520                        or $a <=> $b } @jobstep_todo;
521 my $level = $jobstep[$jobstep_todo[0]]->{level};
522 Log (undef, "start level $level");
523
524
525
526 my %proc;
527 my @freeslot = (0..$#slot);
528 my @holdslot;
529 my %reader;
530 my $progress_is_dirty = 1;
531 my $progress_stats_updated = 0;
532
533 update_progress_stats();
534
535
536
537 THISROUND:
538 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
539 {
540   my $id = $jobstep_todo[$todo_ptr];
541   my $Jobstep = $jobstep[$id];
542   if ($Jobstep->{level} != $level)
543   {
544     next;
545   }
546
547   pipe $reader{$id}, "writer" or croak ($!);
548   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
549   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
550
551   my $childslot = $freeslot[0];
552   my $childnode = $slot[$childslot]->{node};
553   my $childslotname = join (".",
554                             $slot[$childslot]->{node}->{name},
555                             $slot[$childslot]->{cpu});
556   my $childpid = fork();
557   if ($childpid == 0)
558   {
559     $SIG{'INT'} = 'DEFAULT';
560     $SIG{'QUIT'} = 'DEFAULT';
561     $SIG{'TERM'} = 'DEFAULT';
562
563     foreach (values (%reader))
564     {
565       close($_);
566     }
567     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
568     open(STDOUT,">&writer");
569     open(STDERR,">&writer");
570
571     undef $dbh;
572     undef $sth;
573
574     delete $ENV{"GNUPGHOME"};
575     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
576     $ENV{"TASK_QSEQUENCE"} = $id;
577     $ENV{"TASK_SEQUENCE"} = $level;
578     $ENV{"JOB_SCRIPT"} = $Job->{script};
579     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
580       $param =~ tr/a-z/A-Z/;
581       $ENV{"JOB_PARAMETER_$param"} = $value;
582     }
583     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
584     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
585     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
586     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
587     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
588     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
589     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
590
591     $ENV{"GZIP"} = "-n";
592
593     my @srunargs = (
594       "srun",
595       "--nodelist=".$childnode->{name},
596       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
597       "--job-name=$job_id.$id.$$",
598         );
599     my @execargs = qw(sh);
600     my $build_script_to_send = "";
601     my $command =
602         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
603         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
604         ."&& cd $ENV{CRUNCH_TMP} ";
605     if ($build_script)
606     {
607       $build_script_to_send = $build_script;
608       $command .=
609           "&& perl -";
610     }
611     $command .=
612         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
613     my @execargs = ('bash', '-c', $command);
614     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
615     exit (111);
616   }
617   close("writer");
618   if (!defined $childpid)
619   {
620     close $reader{$id};
621     delete $reader{$id};
622     next;
623   }
624   shift @freeslot;
625   $proc{$childpid} = { jobstep => $id,
626                        time => time,
627                        slot => $childslot,
628                        jobstepname => "$job_id.$id.$childpid",
629                      };
630   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
631   $slot[$childslot]->{pid} = $childpid;
632
633   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
634   Log ($id, "child $childpid started on $childslotname");
635   $Jobstep->{starttime} = time;
636   $Jobstep->{node} = $childnode->{name};
637   $Jobstep->{slotindex} = $childslot;
638   delete $Jobstep->{stderr};
639   delete $Jobstep->{finishtime};
640
641   splice @jobstep_todo, $todo_ptr, 1;
642   --$todo_ptr;
643
644   $progress_is_dirty = 1;
645
646   while (!@freeslot
647          ||
648          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
649   {
650     last THISROUND if $main::please_freeze;
651     if ($main::please_info)
652     {
653       $main::please_info = 0;
654       freeze();
655       collate_output();
656       save_meta(1);
657       update_progress_stats();
658     }
659     my $gotsome
660         = readfrompipes ()
661         + reapchildren ();
662     if (!$gotsome)
663     {
664       check_refresh_wanted();
665       check_squeue();
666       update_progress_stats();
667       select (undef, undef, undef, 0.1);
668     }
669     elsif (time - $progress_stats_updated >= 30)
670     {
671       update_progress_stats();
672     }
673     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
674         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
675     {
676       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
677           .($thisround_failed+$thisround_succeeded)
678           .") -- giving up on this round";
679       Log (undef, $message);
680       last THISROUND;
681     }
682
683     # move slots from freeslot to holdslot (or back to freeslot) if necessary
684     for (my $i=$#freeslot; $i>=0; $i--) {
685       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
686         push @holdslot, (splice @freeslot, $i, 1);
687       }
688     }
689     for (my $i=$#holdslot; $i>=0; $i--) {
690       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
691         push @freeslot, (splice @holdslot, $i, 1);
692       }
693     }
694
695     # give up if no nodes are succeeding
696     if (!grep { $_->{node}->{losing_streak} == 0 &&
697                     $_->{node}->{hold_count} < 4 } @slot) {
698       my $message = "Every node has failed -- giving up on this round";
699       Log (undef, $message);
700       last THISROUND;
701     }
702   }
703 }
704
705
706 push @freeslot, splice @holdslot;
707 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
708
709
710 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
711 while (%proc)
712 {
713   if ($main::please_continue) {
714     $main::please_continue = 0;
715     goto THISROUND;
716   }
717   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
718   readfrompipes ();
719   if (!reapchildren())
720   {
721     check_refresh_wanted();
722     check_squeue();
723     update_progress_stats();
724     select (undef, undef, undef, 0.1);
725     killem (keys %proc) if $main::please_freeze;
726   }
727 }
728
729 update_progress_stats();
730 freeze_if_want_freeze();
731
732
733 if (!defined $main::success)
734 {
735   if (@jobstep_todo &&
736       $thisround_succeeded == 0 &&
737       ($thisround_failed == 0 || $thisround_failed > 4))
738   {
739     my $message = "stop because $thisround_failed tasks failed and none succeeded";
740     Log (undef, $message);
741     $main::success = 0;
742   }
743   if (!@jobstep_todo)
744   {
745     $main::success = 1;
746   }
747 }
748
749 goto ONELEVEL if !defined $main::success;
750
751
752 release_allocation();
753 freeze();
754 if ($job_has_uuid) {
755   $Job->update_attributes('output' => &collate_output(),
756                           'running' => 0,
757                           'success' => $Job->{'output'} && $main::success,
758                           'finished_at' => scalar gmtime)
759 }
760
761 if ($Job->{'output'})
762 {
763   eval {
764     my $manifest_text = capturex("whget", $Job->{'output'});
765     $arv->{'collections'}->{'create'}->execute('collection' => {
766       'uuid' => $Job->{'output'},
767       'manifest_text' => $manifest_text,
768     });
769   };
770   if ($@) {
771     Log (undef, "Failed to register output manifest: $@");
772   }
773 }
774
775 Log (undef, "finish");
776
777 save_meta();
778 exit 0;
779
780
781
782 sub update_progress_stats
783 {
784   $progress_stats_updated = time;
785   return if !$progress_is_dirty;
786   my ($todo, $done, $running) = (scalar @jobstep_todo,
787                                  scalar @jobstep_done,
788                                  scalar @slot - scalar @freeslot - scalar @holdslot);
789   $Job->{'tasks_summary'} ||= {};
790   $Job->{'tasks_summary'}->{'todo'} = $todo;
791   $Job->{'tasks_summary'}->{'done'} = $done;
792   $Job->{'tasks_summary'}->{'running'} = $running;
793   if ($job_has_uuid) {
794     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
795   }
796   Log (undef, "status: $done done, $running running, $todo todo");
797   $progress_is_dirty = 0;
798 }
799
800
801
802 sub reapchildren
803 {
804   my $pid = waitpid (-1, WNOHANG);
805   return 0 if $pid <= 0;
806
807   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
808                   . "."
809                   . $slot[$proc{$pid}->{slot}]->{cpu});
810   my $jobstepid = $proc{$pid}->{jobstep};
811   my $elapsed = time - $proc{$pid}->{time};
812   my $Jobstep = $jobstep[$jobstepid];
813
814   my $childstatus = $?;
815   my $exitvalue = $childstatus >> 8;
816   my $exitinfo = sprintf("exit %d signal %d%s",
817                          $exitvalue,
818                          $childstatus & 127,
819                          ($childstatus & 128 ? ' core dump' : ''));
820   $Jobstep->{'arvados_task'}->reload;
821   my $task_success = $Jobstep->{'arvados_task'}->{success};
822
823   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
824
825   if (!defined $task_success) {
826     # task did not indicate one way or the other --> fail
827     $Jobstep->{'arvados_task'}->{success} = 0;
828     $Jobstep->{'arvados_task'}->save;
829     $task_success = 0;
830   }
831
832   if (!$task_success)
833   {
834     my $temporary_fail;
835     $temporary_fail ||= $Jobstep->{node_fail};
836     $temporary_fail ||= ($exitvalue == 111);
837
838     ++$thisround_failed;
839     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
840
841     # Check for signs of a failed or misconfigured node
842     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
843         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
844       # Don't count this against jobstep failure thresholds if this
845       # node is already suspected faulty and srun exited quickly
846       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
847           $elapsed < 5) {
848         Log ($jobstepid, "blaming failure on suspect node " .
849              $slot[$proc{$pid}->{slot}]->{node}->{name});
850         $temporary_fail ||= 1;
851       }
852       ban_node_by_slot($proc{$pid}->{slot});
853     }
854
855     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
856                              ++$Jobstep->{'failures'},
857                              $temporary_fail ? 'temporary ' : 'permanent',
858                              $elapsed));
859
860     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
861       # Give up on this task, and the whole job
862       $main::success = 0;
863       $main::please_freeze = 1;
864     }
865     else {
866       # Put this task back on the todo queue
867       push @jobstep_todo, $jobstepid;
868     }
869     $Job->{'tasks_summary'}->{'failed'}++;
870   }
871   else
872   {
873     ++$thisround_succeeded;
874     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
875     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
876     push @jobstep_done, $jobstepid;
877     Log ($jobstepid, "success in $elapsed seconds");
878   }
879   $Jobstep->{exitcode} = $childstatus;
880   $Jobstep->{finishtime} = time;
881   process_stderr ($jobstepid, $task_success);
882   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
883
884   close $reader{$jobstepid};
885   delete $reader{$jobstepid};
886   delete $slot[$proc{$pid}->{slot}]->{pid};
887   push @freeslot, $proc{$pid}->{slot};
888   delete $proc{$pid};
889
890   # Load new tasks
891   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
892     'where' => {
893       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
894     },
895     'order' => 'qsequence'
896   );
897   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
898     my $jobstep = {
899       'level' => $arvados_task->{'sequence'},
900       'failures' => 0,
901       'arvados_task' => $arvados_task
902     };
903     push @jobstep, $jobstep;
904     push @jobstep_todo, $#jobstep;
905   }
906
907   $progress_is_dirty = 1;
908   1;
909 }
910
911 sub check_refresh_wanted
912 {
913   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
914   if (@stat && $stat[9] > $latest_refresh) {
915     $latest_refresh = scalar time;
916     if ($job_has_uuid) {
917       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
918       for my $attr ('cancelled_at',
919                     'cancelled_by_user_uuid',
920                     'cancelled_by_client_uuid') {
921         $Job->{$attr} = $Job2->{$attr};
922       }
923       if ($Job->{'cancelled_at'}) {
924         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
925              " by user " . $Job->{cancelled_by_user_uuid});
926         $main::success = 0;
927         $main::please_freeze = 1;
928       }
929     }
930   }
931 }
932
933 sub check_squeue
934 {
935   # return if the kill list was checked <4 seconds ago
936   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
937   {
938     return;
939   }
940   $squeue_kill_checked = time;
941
942   # use killem() on procs whose killtime is reached
943   for (keys %proc)
944   {
945     if (exists $proc{$_}->{killtime}
946         && $proc{$_}->{killtime} <= time)
947     {
948       killem ($_);
949     }
950   }
951
952   # return if the squeue was checked <60 seconds ago
953   if (defined $squeue_checked && $squeue_checked > time - 60)
954   {
955     return;
956   }
957   $squeue_checked = time;
958
959   if (!$have_slurm)
960   {
961     # here is an opportunity to check for mysterious problems with local procs
962     return;
963   }
964
965   # get a list of steps still running
966   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
967   chop @squeue;
968   if ($squeue[-1] ne "ok")
969   {
970     return;
971   }
972   pop @squeue;
973
974   # which of my jobsteps are running, according to squeue?
975   my %ok;
976   foreach (@squeue)
977   {
978     if (/^(\d+)\.(\d+) (\S+)/)
979     {
980       if ($1 eq $ENV{SLURM_JOBID})
981       {
982         $ok{$3} = 1;
983       }
984     }
985   }
986
987   # which of my active child procs (>60s old) were not mentioned by squeue?
988   foreach (keys %proc)
989   {
990     if ($proc{$_}->{time} < time - 60
991         && !exists $ok{$proc{$_}->{jobstepname}}
992         && !exists $proc{$_}->{killtime})
993     {
994       # kill this proc if it hasn't exited in 30 seconds
995       $proc{$_}->{killtime} = time + 30;
996     }
997   }
998 }
999
1000
1001 sub release_allocation
1002 {
1003   if ($have_slurm)
1004   {
1005     Log (undef, "release job allocation");
1006     system "scancel $ENV{SLURM_JOBID}";
1007   }
1008 }
1009
1010
1011 sub readfrompipes
1012 {
1013   my $gotsome = 0;
1014   foreach my $job (keys %reader)
1015   {
1016     my $buf;
1017     while (0 < sysread ($reader{$job}, $buf, 8192))
1018     {
1019       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1020       $jobstep[$job]->{stderr} .= $buf;
1021       preprocess_stderr ($job);
1022       if (length ($jobstep[$job]->{stderr}) > 16384)
1023       {
1024         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1025       }
1026       $gotsome = 1;
1027     }
1028   }
1029   return $gotsome;
1030 }
1031
1032
1033 sub preprocess_stderr
1034 {
1035   my $job = shift;
1036
1037   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1038     my $line = $1;
1039     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1040     Log ($job, "stderr $line");
1041     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1042       # whoa.
1043       $main::please_freeze = 1;
1044     }
1045     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1046       $jobstep[$job]->{node_fail} = 1;
1047       ban_node_by_slot($jobstep[$job]->{slotindex});
1048     }
1049   }
1050 }
1051
1052
1053 sub process_stderr
1054 {
1055   my $job = shift;
1056   my $task_success = shift;
1057   preprocess_stderr ($job);
1058
1059   map {
1060     Log ($job, "stderr $_");
1061   } split ("\n", $jobstep[$job]->{stderr});
1062 }
1063
1064
1065 sub collate_output
1066 {
1067   my $whc = Warehouse->new;
1068   Log (undef, "collate");
1069   $whc->write_start (1);
1070   my $joboutput;
1071   for (@jobstep)
1072   {
1073     next if (!exists $_->{'arvados_task'}->{output} ||
1074              !$_->{'arvados_task'}->{'success'} ||
1075              $_->{'exitcode'} != 0);
1076     my $output = $_->{'arvados_task'}->{output};
1077     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1078     {
1079       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1080       $whc->write_data ($output);
1081     }
1082     elsif (@jobstep == 1)
1083     {
1084       $joboutput = $output;
1085       $whc->write_finish;
1086     }
1087     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1088     {
1089       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1090       $whc->write_data ($outblock);
1091     }
1092     else
1093     {
1094       my $errstr = $whc->errstr;
1095       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1096       $main::success = 0;
1097     }
1098   }
1099   $joboutput = $whc->write_finish if !defined $joboutput;
1100   if ($joboutput)
1101   {
1102     Log (undef, "output $joboutput");
1103     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1104   }
1105   else
1106   {
1107     Log (undef, "output undef");
1108   }
1109   return $joboutput;
1110 }
1111
1112
1113 sub killem
1114 {
1115   foreach (@_)
1116   {
1117     my $sig = 2;                # SIGINT first
1118     if (exists $proc{$_}->{"sent_$sig"} &&
1119         time - $proc{$_}->{"sent_$sig"} > 4)
1120     {
1121       $sig = 15;                # SIGTERM if SIGINT doesn't work
1122     }
1123     if (exists $proc{$_}->{"sent_$sig"} &&
1124         time - $proc{$_}->{"sent_$sig"} > 4)
1125     {
1126       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1127     }
1128     if (!exists $proc{$_}->{"sent_$sig"})
1129     {
1130       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1131       kill $sig, $_;
1132       select (undef, undef, undef, 0.1);
1133       if ($sig == 2)
1134       {
1135         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1136       }
1137       $proc{$_}->{"sent_$sig"} = time;
1138       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1139     }
1140   }
1141 }
1142
1143
1144 sub fhbits
1145 {
1146   my($bits);
1147   for (@_) {
1148     vec($bits,fileno($_),1) = 1;
1149   }
1150   $bits;
1151 }
1152
1153
1154 sub Log                         # ($jobstep_id, $logmessage)
1155 {
1156   if ($_[1] =~ /\n/) {
1157     for my $line (split (/\n/, $_[1])) {
1158       Log ($_[0], $line);
1159     }
1160     return;
1161   }
1162   my $fh = select STDERR; $|=1; select $fh;
1163   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1164   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1165   $message .= "\n";
1166   my $datetime;
1167   if ($metastream || -t STDERR) {
1168     my @gmtime = gmtime;
1169     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1170                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1171   }
1172   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1173
1174   return if !$metastream;
1175   $metastream->write_data ($datetime . " " . $message);
1176 }
1177
1178
1179 sub croak
1180 {
1181   my ($package, $file, $line) = caller;
1182   my $message = "@_ at $file line $line\n";
1183   Log (undef, $message);
1184   freeze() if @jobstep_todo;
1185   collate_output() if @jobstep_todo;
1186   cleanup();
1187   save_meta() if $metastream;
1188   die;
1189 }
1190
1191
1192 sub cleanup
1193 {
1194   return if !$job_has_uuid;
1195   $Job->update_attributes('running' => 0,
1196                           'success' => 0,
1197                           'finished_at' => scalar gmtime);
1198 }
1199
1200
1201 sub save_meta
1202 {
1203   my $justcheckpoint = shift; # false if this will be the last meta saved
1204   my $m = $metastream;
1205   $m = $m->copy if $justcheckpoint;
1206   $m->write_finish;
1207   my $whc = Warehouse->new;
1208   my $loglocator = $whc->store_block ($m->as_string);
1209   $arv->{'collections'}->{'create'}->execute('collection' => {
1210     'uuid' => $loglocator,
1211     'manifest_text' => $m->as_string,
1212   });
1213   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1214   Log (undef, "log manifest is $loglocator");
1215   $Job->{'log'} = $loglocator;
1216   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1217 }
1218
1219
1220 sub freeze_if_want_freeze
1221 {
1222   if ($main::please_freeze)
1223   {
1224     release_allocation();
1225     if (@_)
1226     {
1227       # kill some srun procs before freeze+stop
1228       map { $proc{$_} = {} } @_;
1229       while (%proc)
1230       {
1231         killem (keys %proc);
1232         select (undef, undef, undef, 0.1);
1233         my $died;
1234         while (($died = waitpid (-1, WNOHANG)) > 0)
1235         {
1236           delete $proc{$died};
1237         }
1238       }
1239     }
1240     freeze();
1241     collate_output();
1242     cleanup();
1243     save_meta();
1244     exit 0;
1245   }
1246 }
1247
1248
1249 sub freeze
1250 {
1251   Log (undef, "Freeze not implemented");
1252   return;
1253 }
1254
1255
1256 sub thaw
1257 {
1258   croak ("Thaw not implemented");
1259
1260   my $whc;
1261   my $key = shift;
1262   Log (undef, "thaw from $key");
1263
1264   @jobstep = ();
1265   @jobstep_done = ();
1266   @jobstep_todo = ();
1267   @jobstep_tomerge = ();
1268   $jobstep_tomerge_level = 0;
1269   my $frozenjob = {};
1270
1271   my $stream = new Warehouse::Stream ( whc => $whc,
1272                                        hash => [split (",", $key)] );
1273   $stream->rewind;
1274   while (my $dataref = $stream->read_until (undef, "\n\n"))
1275   {
1276     if ($$dataref =~ /^job /)
1277     {
1278       foreach (split ("\n", $$dataref))
1279       {
1280         my ($k, $v) = split ("=", $_, 2);
1281         $frozenjob->{$k} = freezeunquote ($v);
1282       }
1283       next;
1284     }
1285
1286     if ($$dataref =~ /^merge (\d+) (.*)/)
1287     {
1288       $jobstep_tomerge_level = $1;
1289       @jobstep_tomerge
1290           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1291       next;
1292     }
1293
1294     my $Jobstep = { };
1295     foreach (split ("\n", $$dataref))
1296     {
1297       my ($k, $v) = split ("=", $_, 2);
1298       $Jobstep->{$k} = freezeunquote ($v) if $k;
1299     }
1300     $Jobstep->{'failures'} = 0;
1301     push @jobstep, $Jobstep;
1302
1303     if ($Jobstep->{exitcode} eq "0")
1304     {
1305       push @jobstep_done, $#jobstep;
1306     }
1307     else
1308     {
1309       push @jobstep_todo, $#jobstep;
1310     }
1311   }
1312
1313   foreach (qw (script script_version script_parameters))
1314   {
1315     $Job->{$_} = $frozenjob->{$_};
1316   }
1317   $Job->save if $job_has_uuid;
1318 }
1319
1320
1321 sub freezequote
1322 {
1323   my $s = shift;
1324   $s =~ s/\\/\\\\/g;
1325   $s =~ s/\n/\\n/g;
1326   return $s;
1327 }
1328
1329
1330 sub freezeunquote
1331 {
1332   my $s = shift;
1333   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1334   return $s;
1335 }
1336
1337
1338 sub srun
1339 {
1340   my $srunargs = shift;
1341   my $execargs = shift;
1342   my $opts = shift || {};
1343   my $stdin = shift;
1344   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1345   print STDERR (join (" ",
1346                       map { / / ? "'$_'" : $_ }
1347                       (@$args)),
1348                 "\n")
1349       if $ENV{CRUNCH_DEBUG};
1350
1351   if (defined $stdin) {
1352     my $child = open STDIN, "-|";
1353     defined $child or die "no fork: $!";
1354     if ($child == 0) {
1355       print $stdin or die $!;
1356       close STDOUT or die $!;
1357       exit 0;
1358     }
1359   }
1360
1361   return system (@$args) if $opts->{fork};
1362
1363   exec @$args;
1364   warn "ENV size is ".length(join(" ",%ENV));
1365   die "exec failed: $!: @$args";
1366 }
1367
1368
1369 sub ban_node_by_slot {
1370   # Don't start any new jobsteps on this node for 60 seconds
1371   my $slotid = shift;
1372   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1373   $slot[$slotid]->{node}->{hold_count}++;
1374   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1375 }
1376
1377 sub must_lock_now
1378 {
1379   my ($lockfile, $error_message) = @_;
1380   open L, ">", $lockfile or croak("$lockfile: $!");
1381   if (!flock L, LOCK_EX|LOCK_NB) {
1382     croak("Can't lock $lockfile: $error_message\n");
1383   }
1384 }
1385
1386 __DATA__
1387 #!/usr/bin/perl
1388
1389 # checkout-and-build
1390
1391 use Fcntl ':flock';
1392
1393 my $destdir = $ENV{"CRUNCH_SRC"};
1394 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1395 my $repo = $ENV{"CRUNCH_SRC_URL"};
1396
1397 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1398 flock L, LOCK_EX;
1399 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1400     exit 0;
1401 }
1402
1403 unlink "$destdir.commit";
1404 open STDOUT, ">", "$destdir.log";
1405 open STDERR, ">&STDOUT";
1406
1407 mkdir $destdir;
1408 my @git_archive_data = <DATA>;
1409 if (@git_archive_data) {
1410   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1411   print TARX @git_archive_data;
1412   if(!close(TARX)) {
1413     die "'tar -C $destdir -xf -' exited $?: $!";
1414   }
1415 }
1416
1417 my $pwd;
1418 chomp ($pwd = `pwd`);
1419 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1420 mkdir $install_dir;
1421
1422 for my $src_path ("$destdir/arvados/sdk/python") {
1423   if (-d $src_path) {
1424     shell_or_die ("virtualenv", $install_dir);
1425     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1426   }
1427 }
1428
1429 if (-e "$destdir/crunch_scripts/install") {
1430     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1431 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1432     # Old version
1433     shell_or_die ("./tests/autotests.sh", $install_dir);
1434 } elsif (-e "./install.sh") {
1435     shell_or_die ("./install.sh", $install_dir);
1436 }
1437
1438 if ($commit) {
1439     unlink "$destdir.commit.new";
1440     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1441     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1442 }
1443
1444 close L;
1445
1446 exit 0;
1447
1448 sub shell_or_die
1449 {
1450   if ($ENV{"DEBUG"}) {
1451     print STDERR "@_\n";
1452   }
1453   system (@_) == 0
1454       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1455 }
1456
1457 __DATA__