Merge branch '2470-doc-refresh'.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use Warehouse;
81 use Warehouse::Stream;
82 use IPC::System::Simple qw(capturex);
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 $metastream = Warehouse::Stream->new(whc => new Warehouse);
179 $metastream->clear;
180 $metastream->name('.');
181 $metastream->write_start($job_id . '.log.txt');
182
183
184 $Job->{'runtime_constraints'} ||= {};
185 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
186 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
187
188
189 Log (undef, "check slurm allocation");
190 my @slot;
191 my @node;
192 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
193 my @sinfo;
194 if (!$have_slurm)
195 {
196   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
197   push @sinfo, "$localcpus localhost";
198 }
199 if (exists $ENV{SLURM_NODELIST})
200 {
201   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
202 }
203 foreach (@sinfo)
204 {
205   my ($ncpus, $slurm_nodelist) = split;
206   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
207
208   my @nodelist;
209   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
210   {
211     my $nodelist = $1;
212     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
213     {
214       my $ranges = $1;
215       foreach (split (",", $ranges))
216       {
217         my ($a, $b);
218         if (/(\d+)-(\d+)/)
219         {
220           $a = $1;
221           $b = $2;
222         }
223         else
224         {
225           $a = $_;
226           $b = $_;
227         }
228         push @nodelist, map {
229           my $n = $nodelist;
230           $n =~ s/\[[-,\d]+\]/$_/;
231           $n;
232         } ($a..$b);
233       }
234     }
235     else
236     {
237       push @nodelist, $nodelist;
238     }
239   }
240   foreach my $nodename (@nodelist)
241   {
242     Log (undef, "node $nodename - $ncpus slots");
243     my $node = { name => $nodename,
244                  ncpus => $ncpus,
245                  losing_streak => 0,
246                  hold_until => 0 };
247     foreach my $cpu (1..$ncpus)
248     {
249       push @slot, { node => $node,
250                     cpu => $cpu };
251     }
252   }
253   push @node, @nodelist;
254 }
255
256
257
258 # Ensure that we get one jobstep running on each allocated node before
259 # we start overloading nodes with concurrent steps
260
261 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
262
263
264
265 my $jobmanager_id;
266 if ($job_has_uuid)
267 {
268   # Claim this job, and make sure nobody else does
269   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
270           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271     croak("Error while updating / locking job");
272   }
273   $Job->update_attributes('started_at' => scalar gmtime,
274                           'running' => 1,
275                           'success' => undef,
276                           'tasks_summary' => { 'failed' => 0,
277                                                'todo' => 1,
278                                                'running' => 0,
279                                                'done' => 0 });
280 }
281
282
283 Log (undef, "start");
284 $SIG{'INT'} = sub { $main::please_freeze = 1; };
285 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
286 $SIG{'TERM'} = \&croak;
287 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
288 $SIG{'ALRM'} = sub { $main::please_info = 1; };
289 $SIG{'CONT'} = sub { $main::please_continue = 1; };
290 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
291
292 $main::please_freeze = 0;
293 $main::please_info = 0;
294 $main::please_continue = 0;
295 $main::please_refresh = 0;
296 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
297
298 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
299 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
300 $ENV{"JOB_UUID"} = $job_id;
301
302
303 my @jobstep;
304 my @jobstep_todo = ();
305 my @jobstep_done = ();
306 my @jobstep_tomerge = ();
307 my $jobstep_tomerge_level = 0;
308 my $squeue_checked;
309 my $squeue_kill_checked;
310 my $output_in_keep = 0;
311 my $latest_refresh = scalar time;
312
313
314
315 if (defined $Job->{thawedfromkey})
316 {
317   thaw ($Job->{thawedfromkey});
318 }
319 else
320 {
321   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
322     'job_uuid' => $Job->{'uuid'},
323     'sequence' => 0,
324     'qsequence' => 0,
325     'parameters' => {},
326                                                           });
327   push @jobstep, { 'level' => 0,
328                    'failures' => 0,
329                    'arvados_task' => $first_task,
330                  };
331   push @jobstep_todo, 0;
332 }
333
334
335 if (!$have_slurm)
336 {
337   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
338 }
339
340
341 my $build_script;
342
343
344 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
345
346 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
347 if ($skip_install)
348 {
349   if (!defined $no_clear_tmp) {
350     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
351     system($clear_tmp_cmd) == 0
352         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
353   }
354   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
355   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
356     if (-d $src_path) {
357       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
358           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
359       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
360           == 0
361           or croak ("setup.py in $src_path failed: exit ".($?>>8));
362     }
363   }
364 }
365 else
366 {
367   do {
368     local $/ = undef;
369     $build_script = <DATA>;
370   };
371   Log (undef, "Install revision ".$Job->{script_version});
372   my $nodelist = join(",", @node);
373
374   if (!defined $no_clear_tmp) {
375     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
376
377     my $cleanpid = fork();
378     if ($cleanpid == 0)
379     {
380       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
381             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
382       exit (1);
383     }
384     while (1)
385     {
386       last if $cleanpid == waitpid (-1, WNOHANG);
387       freeze_if_want_freeze ($cleanpid);
388       select (undef, undef, undef, 0.1);
389     }
390     Log (undef, "Clean-work-dir exited $?");
391   }
392
393   # Install requested code version
394
395   my @execargs;
396   my @srunargs = ("srun",
397                   "--nodelist=$nodelist",
398                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
399
400   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
401   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
402
403   my $commit;
404   my $git_archive;
405   my $treeish = $Job->{'script_version'};
406   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
407   # Todo: let script_version specify repository instead of expecting
408   # parent process to figure it out.
409   $ENV{"CRUNCH_SRC_URL"} = $repo;
410
411   # Create/update our clone of the remote git repo
412
413   if (!-d $ENV{"CRUNCH_SRC"}) {
414     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
415         or croak ("git clone $repo failed: exit ".($?>>8));
416     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
417   }
418   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q --tags origin`;
419
420   # If this looks like a subversion r#, look for it in git-svn commit messages
421
422   if ($treeish =~ m{^\d{1,4}$}) {
423     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
424     chomp $gitlog;
425     if ($gitlog =~ /^[a-f0-9]{40}$/) {
426       $commit = $gitlog;
427       Log (undef, "Using commit $commit for script_version $treeish");
428     }
429   }
430
431   # If that didn't work, try asking git to look it up as a tree-ish.
432
433   if (!defined $commit) {
434
435     my $cooked_treeish = $treeish;
436     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
437       # Looks like a git branch name -- make sure git knows it's
438       # relative to the remote repo
439       $cooked_treeish = "origin/$treeish";
440     }
441
442     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
443     chomp $found;
444     if ($found =~ /^[0-9a-f]{40}$/s) {
445       $commit = $found;
446       if ($commit ne $treeish) {
447         # Make sure we record the real commit id in the database,
448         # frozentokey, logs, etc. -- instead of an abbreviation or a
449         # branch name which can become ambiguous or point to a
450         # different commit in the future.
451         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
452         Log (undef, "Using commit $commit for tree-ish $treeish");
453         if ($commit ne $treeish) {
454           $Job->{'script_version'} = $commit;
455           !$job_has_uuid or
456               $Job->update_attributes('script_version' => $commit) or
457               croak("Error while updating job");
458         }
459       }
460     }
461   }
462
463   if (defined $commit) {
464     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
465     @execargs = ("sh", "-c",
466                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
467     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
468   }
469   else {
470     croak ("could not figure out commit id for $treeish");
471   }
472
473   my $installpid = fork();
474   if ($installpid == 0)
475   {
476     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
477     exit (1);
478   }
479   while (1)
480   {
481     last if $installpid == waitpid (-1, WNOHANG);
482     freeze_if_want_freeze ($installpid);
483     select (undef, undef, undef, 0.1);
484   }
485   Log (undef, "Install exited $?");
486 }
487
488 if (!$have_slurm)
489 {
490   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
491   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
492 }
493
494
495
496 foreach (qw (script script_version script_parameters runtime_constraints))
497 {
498   Log (undef,
499        "$_ " .
500        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
501 }
502 foreach (split (/\n/, $Job->{knobs}))
503 {
504   Log (undef, "knob " . $_);
505 }
506
507
508
509 $main::success = undef;
510
511
512
513 ONELEVEL:
514
515 my $thisround_succeeded = 0;
516 my $thisround_failed = 0;
517 my $thisround_failed_multiple = 0;
518
519 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
520                        or $a <=> $b } @jobstep_todo;
521 my $level = $jobstep[$jobstep_todo[0]]->{level};
522 Log (undef, "start level $level");
523
524
525
526 my %proc;
527 my @freeslot = (0..$#slot);
528 my @holdslot;
529 my %reader;
530 my $progress_is_dirty = 1;
531 my $progress_stats_updated = 0;
532
533 update_progress_stats();
534
535
536
537 THISROUND:
538 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
539 {
540   my $id = $jobstep_todo[$todo_ptr];
541   my $Jobstep = $jobstep[$id];
542   if ($Jobstep->{level} != $level)
543   {
544     next;
545   }
546
547   pipe $reader{$id}, "writer" or croak ($!);
548   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
549   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
550
551   my $childslot = $freeslot[0];
552   my $childnode = $slot[$childslot]->{node};
553   my $childslotname = join (".",
554                             $slot[$childslot]->{node}->{name},
555                             $slot[$childslot]->{cpu});
556   my $childpid = fork();
557   if ($childpid == 0)
558   {
559     $SIG{'INT'} = 'DEFAULT';
560     $SIG{'QUIT'} = 'DEFAULT';
561     $SIG{'TERM'} = 'DEFAULT';
562
563     foreach (values (%reader))
564     {
565       close($_);
566     }
567     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
568     open(STDOUT,">&writer");
569     open(STDERR,">&writer");
570
571     undef $dbh;
572     undef $sth;
573
574     delete $ENV{"GNUPGHOME"};
575     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
576     $ENV{"TASK_QSEQUENCE"} = $id;
577     $ENV{"TASK_SEQUENCE"} = $level;
578     $ENV{"JOB_SCRIPT"} = $Job->{script};
579     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
580       $param =~ tr/a-z/A-Z/;
581       $ENV{"JOB_PARAMETER_$param"} = $value;
582     }
583     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
584     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
585     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
586     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
587     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
588     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
589     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
590
591     $ENV{"GZIP"} = "-n";
592
593     my @srunargs = (
594       "srun",
595       "--nodelist=".$childnode->{name},
596       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
597       "--job-name=$job_id.$id.$$",
598         );
599     my @execargs = qw(sh);
600     my $build_script_to_send = "";
601     my $command =
602         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
603         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
604         ."&& cd $ENV{CRUNCH_TMP} ";
605     if ($build_script)
606     {
607       $build_script_to_send = $build_script;
608       $command .=
609           "&& perl -";
610     }
611     $command .=
612         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
613     my @execargs = ('bash', '-c', $command);
614     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
615     exit (111);
616   }
617   close("writer");
618   if (!defined $childpid)
619   {
620     close $reader{$id};
621     delete $reader{$id};
622     next;
623   }
624   shift @freeslot;
625   $proc{$childpid} = { jobstep => $id,
626                        time => time,
627                        slot => $childslot,
628                        jobstepname => "$job_id.$id.$childpid",
629                      };
630   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
631   $slot[$childslot]->{pid} = $childpid;
632
633   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
634   Log ($id, "child $childpid started on $childslotname");
635   $Jobstep->{starttime} = time;
636   $Jobstep->{node} = $childnode->{name};
637   $Jobstep->{slotindex} = $childslot;
638   delete $Jobstep->{stderr};
639   delete $Jobstep->{finishtime};
640
641   splice @jobstep_todo, $todo_ptr, 1;
642   --$todo_ptr;
643
644   $progress_is_dirty = 1;
645
646   while (!@freeslot
647          ||
648          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
649   {
650     last THISROUND if $main::please_freeze;
651     if ($main::please_info)
652     {
653       $main::please_info = 0;
654       freeze();
655       collate_output();
656       save_meta(1);
657       update_progress_stats();
658     }
659     my $gotsome
660         = readfrompipes ()
661         + reapchildren ();
662     if (!$gotsome)
663     {
664       check_refresh_wanted();
665       check_squeue();
666       update_progress_stats();
667       select (undef, undef, undef, 0.1);
668     }
669     elsif (time - $progress_stats_updated >= 30)
670     {
671       update_progress_stats();
672     }
673     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
674         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
675     {
676       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
677           .($thisround_failed+$thisround_succeeded)
678           .") -- giving up on this round";
679       Log (undef, $message);
680       last THISROUND;
681     }
682
683     # move slots from freeslot to holdslot (or back to freeslot) if necessary
684     for (my $i=$#freeslot; $i>=0; $i--) {
685       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
686         push @holdslot, (splice @freeslot, $i, 1);
687       }
688     }
689     for (my $i=$#holdslot; $i>=0; $i--) {
690       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
691         push @freeslot, (splice @holdslot, $i, 1);
692       }
693     }
694
695     # give up if no nodes are succeeding
696     if (!grep { $_->{node}->{losing_streak} == 0 &&
697                     $_->{node}->{hold_count} < 4 } @slot) {
698       my $message = "Every node has failed -- giving up on this round";
699       Log (undef, $message);
700       last THISROUND;
701     }
702   }
703 }
704
705
706 push @freeslot, splice @holdslot;
707 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
708
709
710 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
711 while (%proc)
712 {
713   if ($main::please_continue) {
714     $main::please_continue = 0;
715     goto THISROUND;
716   }
717   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
718   readfrompipes ();
719   if (!reapchildren())
720   {
721     check_refresh_wanted();
722     check_squeue();
723     update_progress_stats();
724     select (undef, undef, undef, 0.1);
725     killem (keys %proc) if $main::please_freeze;
726   }
727 }
728
729 update_progress_stats();
730 freeze_if_want_freeze();
731
732
733 if (!defined $main::success)
734 {
735   if (@jobstep_todo &&
736       $thisround_succeeded == 0 &&
737       ($thisround_failed == 0 || $thisround_failed > 4))
738   {
739     my $message = "stop because $thisround_failed tasks failed and none succeeded";
740     Log (undef, $message);
741     $main::success = 0;
742   }
743   if (!@jobstep_todo)
744   {
745     $main::success = 1;
746   }
747 }
748
749 goto ONELEVEL if !defined $main::success;
750
751
752 release_allocation();
753 freeze();
754 if ($job_has_uuid) {
755   $Job->update_attributes('output' => &collate_output(),
756                           'running' => 0,
757                           'success' => $Job->{'output'} && $main::success,
758                           'finished_at' => scalar gmtime)
759 }
760
761 if ($Job->{'output'})
762 {
763   eval {
764     my $manifest_text = capturex("whget", $Job->{'output'});
765     $arv->{'collections'}->{'create'}->execute('collection' => {
766       'uuid' => $Job->{'output'},
767       'manifest_text' => $manifest_text,
768     });
769     if ($Job->{'output_is_persistent'}) {
770       $arv->{'links'}->{'create'}->execute('link' => {
771         'tail_kind' => 'arvados#user',
772         'tail_uuid' => $User->{'uuid'},
773         'head_kind' => 'arvados#collection',
774         'head_uuid' => $Job->{'output'},
775         'link_class' => 'resources',
776         'name' => 'wants',
777       });
778     }
779   };
780   if ($@) {
781     Log (undef, "Failed to register output manifest: $@");
782   }
783 }
784
785 Log (undef, "finish");
786
787 save_meta();
788 exit 0;
789
790
791
792 sub update_progress_stats
793 {
794   $progress_stats_updated = time;
795   return if !$progress_is_dirty;
796   my ($todo, $done, $running) = (scalar @jobstep_todo,
797                                  scalar @jobstep_done,
798                                  scalar @slot - scalar @freeslot - scalar @holdslot);
799   $Job->{'tasks_summary'} ||= {};
800   $Job->{'tasks_summary'}->{'todo'} = $todo;
801   $Job->{'tasks_summary'}->{'done'} = $done;
802   $Job->{'tasks_summary'}->{'running'} = $running;
803   if ($job_has_uuid) {
804     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
805   }
806   Log (undef, "status: $done done, $running running, $todo todo");
807   $progress_is_dirty = 0;
808 }
809
810
811
812 sub reapchildren
813 {
814   my $pid = waitpid (-1, WNOHANG);
815   return 0 if $pid <= 0;
816
817   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
818                   . "."
819                   . $slot[$proc{$pid}->{slot}]->{cpu});
820   my $jobstepid = $proc{$pid}->{jobstep};
821   my $elapsed = time - $proc{$pid}->{time};
822   my $Jobstep = $jobstep[$jobstepid];
823
824   my $childstatus = $?;
825   my $exitvalue = $childstatus >> 8;
826   my $exitinfo = sprintf("exit %d signal %d%s",
827                          $exitvalue,
828                          $childstatus & 127,
829                          ($childstatus & 128 ? ' core dump' : ''));
830   $Jobstep->{'arvados_task'}->reload;
831   my $task_success = $Jobstep->{'arvados_task'}->{success};
832
833   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
834
835   if (!defined $task_success) {
836     # task did not indicate one way or the other --> fail
837     $Jobstep->{'arvados_task'}->{success} = 0;
838     $Jobstep->{'arvados_task'}->save;
839     $task_success = 0;
840   }
841
842   if (!$task_success)
843   {
844     my $temporary_fail;
845     $temporary_fail ||= $Jobstep->{node_fail};
846     $temporary_fail ||= ($exitvalue == 111);
847
848     ++$thisround_failed;
849     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
850
851     # Check for signs of a failed or misconfigured node
852     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
853         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
854       # Don't count this against jobstep failure thresholds if this
855       # node is already suspected faulty and srun exited quickly
856       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
857           $elapsed < 5) {
858         Log ($jobstepid, "blaming failure on suspect node " .
859              $slot[$proc{$pid}->{slot}]->{node}->{name});
860         $temporary_fail ||= 1;
861       }
862       ban_node_by_slot($proc{$pid}->{slot});
863     }
864
865     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
866                              ++$Jobstep->{'failures'},
867                              $temporary_fail ? 'temporary ' : 'permanent',
868                              $elapsed));
869
870     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
871       # Give up on this task, and the whole job
872       $main::success = 0;
873       $main::please_freeze = 1;
874     }
875     else {
876       # Put this task back on the todo queue
877       push @jobstep_todo, $jobstepid;
878     }
879     $Job->{'tasks_summary'}->{'failed'}++;
880   }
881   else
882   {
883     ++$thisround_succeeded;
884     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
885     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
886     push @jobstep_done, $jobstepid;
887     Log ($jobstepid, "success in $elapsed seconds");
888   }
889   $Jobstep->{exitcode} = $childstatus;
890   $Jobstep->{finishtime} = time;
891   process_stderr ($jobstepid, $task_success);
892   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
893
894   close $reader{$jobstepid};
895   delete $reader{$jobstepid};
896   delete $slot[$proc{$pid}->{slot}]->{pid};
897   push @freeslot, $proc{$pid}->{slot};
898   delete $proc{$pid};
899
900   # Load new tasks
901   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
902     'where' => {
903       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
904     },
905     'order' => 'qsequence'
906   );
907   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
908     my $jobstep = {
909       'level' => $arvados_task->{'sequence'},
910       'failures' => 0,
911       'arvados_task' => $arvados_task
912     };
913     push @jobstep, $jobstep;
914     push @jobstep_todo, $#jobstep;
915   }
916
917   $progress_is_dirty = 1;
918   1;
919 }
920
921 sub check_refresh_wanted
922 {
923   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
924   if (@stat && $stat[9] > $latest_refresh) {
925     $latest_refresh = scalar time;
926     if ($job_has_uuid) {
927       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
928       for my $attr ('cancelled_at',
929                     'cancelled_by_user_uuid',
930                     'cancelled_by_client_uuid') {
931         $Job->{$attr} = $Job2->{$attr};
932       }
933       if ($Job->{'cancelled_at'}) {
934         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
935              " by user " . $Job->{cancelled_by_user_uuid});
936         $main::success = 0;
937         $main::please_freeze = 1;
938       }
939     }
940   }
941 }
942
943 sub check_squeue
944 {
945   # return if the kill list was checked <4 seconds ago
946   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
947   {
948     return;
949   }
950   $squeue_kill_checked = time;
951
952   # use killem() on procs whose killtime is reached
953   for (keys %proc)
954   {
955     if (exists $proc{$_}->{killtime}
956         && $proc{$_}->{killtime} <= time)
957     {
958       killem ($_);
959     }
960   }
961
962   # return if the squeue was checked <60 seconds ago
963   if (defined $squeue_checked && $squeue_checked > time - 60)
964   {
965     return;
966   }
967   $squeue_checked = time;
968
969   if (!$have_slurm)
970   {
971     # here is an opportunity to check for mysterious problems with local procs
972     return;
973   }
974
975   # get a list of steps still running
976   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
977   chop @squeue;
978   if ($squeue[-1] ne "ok")
979   {
980     return;
981   }
982   pop @squeue;
983
984   # which of my jobsteps are running, according to squeue?
985   my %ok;
986   foreach (@squeue)
987   {
988     if (/^(\d+)\.(\d+) (\S+)/)
989     {
990       if ($1 eq $ENV{SLURM_JOBID})
991       {
992         $ok{$3} = 1;
993       }
994     }
995   }
996
997   # which of my active child procs (>60s old) were not mentioned by squeue?
998   foreach (keys %proc)
999   {
1000     if ($proc{$_}->{time} < time - 60
1001         && !exists $ok{$proc{$_}->{jobstepname}}
1002         && !exists $proc{$_}->{killtime})
1003     {
1004       # kill this proc if it hasn't exited in 30 seconds
1005       $proc{$_}->{killtime} = time + 30;
1006     }
1007   }
1008 }
1009
1010
1011 sub release_allocation
1012 {
1013   if ($have_slurm)
1014   {
1015     Log (undef, "release job allocation");
1016     system "scancel $ENV{SLURM_JOBID}";
1017   }
1018 }
1019
1020
1021 sub readfrompipes
1022 {
1023   my $gotsome = 0;
1024   foreach my $job (keys %reader)
1025   {
1026     my $buf;
1027     while (0 < sysread ($reader{$job}, $buf, 8192))
1028     {
1029       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1030       $jobstep[$job]->{stderr} .= $buf;
1031       preprocess_stderr ($job);
1032       if (length ($jobstep[$job]->{stderr}) > 16384)
1033       {
1034         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1035       }
1036       $gotsome = 1;
1037     }
1038   }
1039   return $gotsome;
1040 }
1041
1042
1043 sub preprocess_stderr
1044 {
1045   my $job = shift;
1046
1047   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1048     my $line = $1;
1049     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1050     Log ($job, "stderr $line");
1051     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1052       # whoa.
1053       $main::please_freeze = 1;
1054     }
1055     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1056       $jobstep[$job]->{node_fail} = 1;
1057       ban_node_by_slot($jobstep[$job]->{slotindex});
1058     }
1059   }
1060 }
1061
1062
1063 sub process_stderr
1064 {
1065   my $job = shift;
1066   my $task_success = shift;
1067   preprocess_stderr ($job);
1068
1069   map {
1070     Log ($job, "stderr $_");
1071   } split ("\n", $jobstep[$job]->{stderr});
1072 }
1073
1074
1075 sub collate_output
1076 {
1077   my $whc = Warehouse->new;
1078   Log (undef, "collate");
1079   $whc->write_start (1);
1080   my $joboutput;
1081   for (@jobstep)
1082   {
1083     next if (!exists $_->{'arvados_task'}->{output} ||
1084              !$_->{'arvados_task'}->{'success'} ||
1085              $_->{'exitcode'} != 0);
1086     my $output = $_->{'arvados_task'}->{output};
1087     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1088     {
1089       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1090       $whc->write_data ($output);
1091     }
1092     elsif (@jobstep == 1)
1093     {
1094       $joboutput = $output;
1095       $whc->write_finish;
1096     }
1097     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1098     {
1099       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1100       $whc->write_data ($outblock);
1101     }
1102     else
1103     {
1104       my $errstr = $whc->errstr;
1105       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1106       $main::success = 0;
1107     }
1108   }
1109   $joboutput = $whc->write_finish if !defined $joboutput;
1110   if ($joboutput)
1111   {
1112     Log (undef, "output $joboutput");
1113     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1114   }
1115   else
1116   {
1117     Log (undef, "output undef");
1118   }
1119   return $joboutput;
1120 }
1121
1122
1123 sub killem
1124 {
1125   foreach (@_)
1126   {
1127     my $sig = 2;                # SIGINT first
1128     if (exists $proc{$_}->{"sent_$sig"} &&
1129         time - $proc{$_}->{"sent_$sig"} > 4)
1130     {
1131       $sig = 15;                # SIGTERM if SIGINT doesn't work
1132     }
1133     if (exists $proc{$_}->{"sent_$sig"} &&
1134         time - $proc{$_}->{"sent_$sig"} > 4)
1135     {
1136       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1137     }
1138     if (!exists $proc{$_}->{"sent_$sig"})
1139     {
1140       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1141       kill $sig, $_;
1142       select (undef, undef, undef, 0.1);
1143       if ($sig == 2)
1144       {
1145         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1146       }
1147       $proc{$_}->{"sent_$sig"} = time;
1148       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1149     }
1150   }
1151 }
1152
1153
1154 sub fhbits
1155 {
1156   my($bits);
1157   for (@_) {
1158     vec($bits,fileno($_),1) = 1;
1159   }
1160   $bits;
1161 }
1162
1163
1164 sub Log                         # ($jobstep_id, $logmessage)
1165 {
1166   if ($_[1] =~ /\n/) {
1167     for my $line (split (/\n/, $_[1])) {
1168       Log ($_[0], $line);
1169     }
1170     return;
1171   }
1172   my $fh = select STDERR; $|=1; select $fh;
1173   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1174   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1175   $message .= "\n";
1176   my $datetime;
1177   if ($metastream || -t STDERR) {
1178     my @gmtime = gmtime;
1179     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1180                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1181   }
1182   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1183
1184   return if !$metastream;
1185   $metastream->write_data ($datetime . " " . $message);
1186 }
1187
1188
1189 sub croak
1190 {
1191   my ($package, $file, $line) = caller;
1192   my $message = "@_ at $file line $line\n";
1193   Log (undef, $message);
1194   freeze() if @jobstep_todo;
1195   collate_output() if @jobstep_todo;
1196   cleanup();
1197   save_meta() if $metastream;
1198   die;
1199 }
1200
1201
1202 sub cleanup
1203 {
1204   return if !$job_has_uuid;
1205   $Job->update_attributes('running' => 0,
1206                           'success' => 0,
1207                           'finished_at' => scalar gmtime);
1208 }
1209
1210
1211 sub save_meta
1212 {
1213   my $justcheckpoint = shift; # false if this will be the last meta saved
1214   my $m = $metastream;
1215   $m = $m->copy if $justcheckpoint;
1216   $m->write_finish;
1217   my $whc = Warehouse->new;
1218   my $loglocator = $whc->store_block ($m->as_string);
1219   $arv->{'collections'}->{'create'}->execute('collection' => {
1220     'uuid' => $loglocator,
1221     'manifest_text' => $m->as_string,
1222   });
1223   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1224   Log (undef, "log manifest is $loglocator");
1225   $Job->{'log'} = $loglocator;
1226   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1227 }
1228
1229
1230 sub freeze_if_want_freeze
1231 {
1232   if ($main::please_freeze)
1233   {
1234     release_allocation();
1235     if (@_)
1236     {
1237       # kill some srun procs before freeze+stop
1238       map { $proc{$_} = {} } @_;
1239       while (%proc)
1240       {
1241         killem (keys %proc);
1242         select (undef, undef, undef, 0.1);
1243         my $died;
1244         while (($died = waitpid (-1, WNOHANG)) > 0)
1245         {
1246           delete $proc{$died};
1247         }
1248       }
1249     }
1250     freeze();
1251     collate_output();
1252     cleanup();
1253     save_meta();
1254     exit 0;
1255   }
1256 }
1257
1258
1259 sub freeze
1260 {
1261   Log (undef, "Freeze not implemented");
1262   return;
1263 }
1264
1265
1266 sub thaw
1267 {
1268   croak ("Thaw not implemented");
1269
1270   my $whc;
1271   my $key = shift;
1272   Log (undef, "thaw from $key");
1273
1274   @jobstep = ();
1275   @jobstep_done = ();
1276   @jobstep_todo = ();
1277   @jobstep_tomerge = ();
1278   $jobstep_tomerge_level = 0;
1279   my $frozenjob = {};
1280
1281   my $stream = new Warehouse::Stream ( whc => $whc,
1282                                        hash => [split (",", $key)] );
1283   $stream->rewind;
1284   while (my $dataref = $stream->read_until (undef, "\n\n"))
1285   {
1286     if ($$dataref =~ /^job /)
1287     {
1288       foreach (split ("\n", $$dataref))
1289       {
1290         my ($k, $v) = split ("=", $_, 2);
1291         $frozenjob->{$k} = freezeunquote ($v);
1292       }
1293       next;
1294     }
1295
1296     if ($$dataref =~ /^merge (\d+) (.*)/)
1297     {
1298       $jobstep_tomerge_level = $1;
1299       @jobstep_tomerge
1300           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1301       next;
1302     }
1303
1304     my $Jobstep = { };
1305     foreach (split ("\n", $$dataref))
1306     {
1307       my ($k, $v) = split ("=", $_, 2);
1308       $Jobstep->{$k} = freezeunquote ($v) if $k;
1309     }
1310     $Jobstep->{'failures'} = 0;
1311     push @jobstep, $Jobstep;
1312
1313     if ($Jobstep->{exitcode} eq "0")
1314     {
1315       push @jobstep_done, $#jobstep;
1316     }
1317     else
1318     {
1319       push @jobstep_todo, $#jobstep;
1320     }
1321   }
1322
1323   foreach (qw (script script_version script_parameters))
1324   {
1325     $Job->{$_} = $frozenjob->{$_};
1326   }
1327   $Job->save if $job_has_uuid;
1328 }
1329
1330
1331 sub freezequote
1332 {
1333   my $s = shift;
1334   $s =~ s/\\/\\\\/g;
1335   $s =~ s/\n/\\n/g;
1336   return $s;
1337 }
1338
1339
1340 sub freezeunquote
1341 {
1342   my $s = shift;
1343   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1344   return $s;
1345 }
1346
1347
1348 sub srun
1349 {
1350   my $srunargs = shift;
1351   my $execargs = shift;
1352   my $opts = shift || {};
1353   my $stdin = shift;
1354   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1355   print STDERR (join (" ",
1356                       map { / / ? "'$_'" : $_ }
1357                       (@$args)),
1358                 "\n")
1359       if $ENV{CRUNCH_DEBUG};
1360
1361   if (defined $stdin) {
1362     my $child = open STDIN, "-|";
1363     defined $child or die "no fork: $!";
1364     if ($child == 0) {
1365       print $stdin or die $!;
1366       close STDOUT or die $!;
1367       exit 0;
1368     }
1369   }
1370
1371   return system (@$args) if $opts->{fork};
1372
1373   exec @$args;
1374   warn "ENV size is ".length(join(" ",%ENV));
1375   die "exec failed: $!: @$args";
1376 }
1377
1378
1379 sub ban_node_by_slot {
1380   # Don't start any new jobsteps on this node for 60 seconds
1381   my $slotid = shift;
1382   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1383   $slot[$slotid]->{node}->{hold_count}++;
1384   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1385 }
1386
1387 sub must_lock_now
1388 {
1389   my ($lockfile, $error_message) = @_;
1390   open L, ">", $lockfile or croak("$lockfile: $!");
1391   if (!flock L, LOCK_EX|LOCK_NB) {
1392     croak("Can't lock $lockfile: $error_message\n");
1393   }
1394 }
1395
1396 __DATA__
1397 #!/usr/bin/perl
1398
1399 # checkout-and-build
1400
1401 use Fcntl ':flock';
1402
1403 my $destdir = $ENV{"CRUNCH_SRC"};
1404 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1405 my $repo = $ENV{"CRUNCH_SRC_URL"};
1406
1407 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1408 flock L, LOCK_EX;
1409 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1410     exit 0;
1411 }
1412
1413 unlink "$destdir.commit";
1414 open STDOUT, ">", "$destdir.log";
1415 open STDERR, ">&STDOUT";
1416
1417 mkdir $destdir;
1418 my @git_archive_data = <DATA>;
1419 if (@git_archive_data) {
1420   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1421   print TARX @git_archive_data;
1422   if(!close(TARX)) {
1423     die "'tar -C $destdir -xf -' exited $?: $!";
1424   }
1425 }
1426
1427 my $pwd;
1428 chomp ($pwd = `pwd`);
1429 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1430 mkdir $install_dir;
1431
1432 for my $src_path ("$destdir/arvados/sdk/python") {
1433   if (-d $src_path) {
1434     shell_or_die ("virtualenv", $install_dir);
1435     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1436   }
1437 }
1438
1439 if (-e "$destdir/crunch_scripts/install") {
1440     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1441 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1442     # Old version
1443     shell_or_die ("./tests/autotests.sh", $install_dir);
1444 } elsif (-e "./install.sh") {
1445     shell_or_die ("./install.sh", $install_dir);
1446 }
1447
1448 if ($commit) {
1449     unlink "$destdir.commit.new";
1450     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1451     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1452 }
1453
1454 close L;
1455
1456 exit 0;
1457
1458 sub shell_or_die
1459 {
1460   if ($ENV{"DEBUG"}) {
1461     print STDERR "@_\n";
1462   }
1463   system (@_) == 0
1464       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1465 }
1466
1467 __DATA__