crunch-job: Support runtime Docker image.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $arv_cli;
99
100 if (defined $ENV{"ARV_CLI"}) {
101   $arv_cli = $ENV{"ARV_CLI"};
102 }
103 else {
104   $arv_cli = 'arv';
105 }
106
107 my $force_unlock;
108 my $git_dir;
109 my $jobspec;
110 my $job_api_token;
111 my $no_clear_tmp;
112 my $resume_stash;
113 GetOptions('force-unlock' => \$force_unlock,
114            'git-dir=s' => \$git_dir,
115            'job=s' => \$jobspec,
116            'job-api-token=s' => \$job_api_token,
117            'no-clear-tmp' => \$no_clear_tmp,
118            'resume-stash=s' => \$resume_stash,
119     );
120
121 if (defined $job_api_token) {
122   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
123 }
124
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
128
129
130 $SIG{'USR1'} = sub
131 {
132   $main::ENV{CRUNCH_DEBUG} = 1;
133 };
134 $SIG{'USR2'} = sub
135 {
136   $main::ENV{CRUNCH_DEBUG} = 0;
137 };
138
139
140
141 my $arv = Arvados->new('apiVersion' => 'v1');
142 my $metastream;
143
144 my $User = $arv->{'users'}->{'current'}->execute;
145
146 my $Job = {};
147 my $job_id;
148 my $dbh;
149 my $sth;
150 if ($job_has_uuid)
151 {
152   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153   if (!$force_unlock) {
154     if ($Job->{'is_locked_by_uuid'}) {
155       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
156     }
157     if ($Job->{'success'} ne undef) {
158       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
159     }
160     if ($Job->{'running'}) {
161       croak("Job 'running' flag is already set");
162     }
163     if ($Job->{'started_at'}) {
164       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
165     }
166   }
167 }
168 else
169 {
170   $Job = JSON::decode_json($jobspec);
171
172   if (!$resume_stash)
173   {
174     map { croak ("No $_ specified") unless $Job->{$_} }
175     qw(script script_version script_parameters);
176   }
177
178   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179   $Job->{'started_at'} = gmtime;
180
181   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
182
183   $job_has_uuid = 1;
184 }
185 $job_id = $Job->{'uuid'};
186
187 my $keep_logfile = $job_id . '.log.txt';
188 my $local_logfile = File::Temp->new();
189
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
193
194
195 Log (undef, "check slurm allocation");
196 my @slot;
197 my @node;
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
199 my @sinfo;
200 if (!$have_slurm)
201 {
202   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203   push @sinfo, "$localcpus localhost";
204 }
205 if (exists $ENV{SLURM_NODELIST})
206 {
207   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
208 }
209 foreach (@sinfo)
210 {
211   my ($ncpus, $slurm_nodelist) = split;
212   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
213
214   my @nodelist;
215   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
216   {
217     my $nodelist = $1;
218     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
219     {
220       my $ranges = $1;
221       foreach (split (",", $ranges))
222       {
223         my ($a, $b);
224         if (/(\d+)-(\d+)/)
225         {
226           $a = $1;
227           $b = $2;
228         }
229         else
230         {
231           $a = $_;
232           $b = $_;
233         }
234         push @nodelist, map {
235           my $n = $nodelist;
236           $n =~ s/\[[-,\d]+\]/$_/;
237           $n;
238         } ($a..$b);
239       }
240     }
241     else
242     {
243       push @nodelist, $nodelist;
244     }
245   }
246   foreach my $nodename (@nodelist)
247   {
248     Log (undef, "node $nodename - $ncpus slots");
249     my $node = { name => $nodename,
250                  ncpus => $ncpus,
251                  losing_streak => 0,
252                  hold_until => 0 };
253     foreach my $cpu (1..$ncpus)
254     {
255       push @slot, { node => $node,
256                     cpu => $cpu };
257     }
258   }
259   push @node, @nodelist;
260 }
261
262
263
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
266
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268
269
270
271 my $jobmanager_id;
272 if ($job_has_uuid)
273 {
274   # Claim this job, and make sure nobody else does
275   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277     croak("Error while updating / locking job");
278   }
279   $Job->update_attributes('started_at' => scalar gmtime,
280                           'running' => 1,
281                           'success' => undef,
282                           'tasks_summary' => { 'failed' => 0,
283                                                'todo' => 1,
284                                                'running' => 0,
285                                                'done' => 0 });
286 }
287
288
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
297
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
303
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
307
308
309 my @jobstep;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
314 my $squeue_checked;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
318
319
320
321 if (defined $Job->{thawedfromkey})
322 {
323   thaw ($Job->{thawedfromkey});
324 }
325 else
326 {
327   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328     'job_uuid' => $Job->{'uuid'},
329     'sequence' => 0,
330     'qsequence' => 0,
331     'parameters' => {},
332                                                           });
333   push @jobstep, { 'level' => 0,
334                    'failures' => 0,
335                    'arvados_task' => $first_task,
336                  };
337   push @jobstep_todo, 0;
338 }
339
340
341 if (!$have_slurm)
342 {
343   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
344 }
345
346
347 my $build_script;
348
349
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
351
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
353 if ($skip_install)
354 {
355   if (!defined $no_clear_tmp) {
356     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357     system($clear_tmp_cmd) == 0
358         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
359   }
360   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
362     if (-d $src_path) {
363       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
366           == 0
367           or croak ("setup.py in $src_path failed: exit ".($?>>8));
368     }
369   }
370 }
371 else
372 {
373   do {
374     local $/ = undef;
375     $build_script = <DATA>;
376   };
377   Log (undef, "Install revision ".$Job->{script_version});
378   my $nodelist = join(",", @node);
379
380   if (!defined $no_clear_tmp) {
381     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382
383     my $cleanpid = fork();
384     if ($cleanpid == 0)
385     {
386       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
388       exit (1);
389     }
390     while (1)
391     {
392       last if $cleanpid == waitpid (-1, WNOHANG);
393       freeze_if_want_freeze ($cleanpid);
394       select (undef, undef, undef, 0.1);
395     }
396     Log (undef, "Clean-work-dir exited $?");
397   }
398
399   # Install requested code version
400
401   my @execargs;
402   my @srunargs = ("srun",
403                   "--nodelist=$nodelist",
404                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
405
406   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
408
409   my $commit;
410   my $git_archive;
411   my $treeish = $Job->{'script_version'};
412
413   # If we're running under crunch-dispatch, it will have pulled the
414   # appropriate source tree into its own repository, and given us that
415   # repo's path as $git_dir. If we're running a "local" job, and a
416   # script_version was specified, it's up to the user to provide the
417   # full path to a local repository in Job->{repository}.
418   #
419   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420   # git-archive --remote where appropriate.
421   #
422   # TODO: Accept a locally-hosted Arvados repository by name or
423   # UUID. Use arvados.v1.repositories.list or .get to figure out the
424   # appropriate fetch-url.
425   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
426
427   $ENV{"CRUNCH_SRC_URL"} = $repo;
428
429   if (-d "$repo/.git") {
430     # We were given a working directory, but we are only interested in
431     # the index.
432     $repo = "$repo/.git";
433   }
434
435   # If this looks like a subversion r#, look for it in git-svn commit messages
436
437   if ($treeish =~ m{^\d{1,4}$}) {
438     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
439     chomp $gitlog;
440     if ($gitlog =~ /^[a-f0-9]{40}$/) {
441       $commit = $gitlog;
442       Log (undef, "Using commit $commit for script_version $treeish");
443     }
444   }
445
446   # If that didn't work, try asking git to look it up as a tree-ish.
447
448   if (!defined $commit) {
449     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
450     chomp $found;
451     if ($found =~ /^[0-9a-f]{40}$/s) {
452       $commit = $found;
453       if ($commit ne $treeish) {
454         # Make sure we record the real commit id in the database,
455         # frozentokey, logs, etc. -- instead of an abbreviation or a
456         # branch name which can become ambiguous or point to a
457         # different commit in the future.
458         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459         Log (undef, "Using commit $commit for tree-ish $treeish");
460         if ($commit ne $treeish) {
461           $Job->{'script_version'} = $commit;
462           !$job_has_uuid or
463               $Job->update_attributes('script_version' => $commit) or
464               croak("Error while updating job");
465         }
466       }
467     }
468   }
469
470   if (defined $commit) {
471     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472     @execargs = ("sh", "-c",
473                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
475   }
476   else {
477     croak ("could not figure out commit id for $treeish");
478   }
479
480   my $installpid = fork();
481   if ($installpid == 0)
482   {
483     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
484     exit (1);
485   }
486   while (1)
487   {
488     last if $installpid == waitpid (-1, WNOHANG);
489     freeze_if_want_freeze ($installpid);
490     select (undef, undef, undef, 0.1);
491   }
492   Log (undef, "Install exited $?");
493 }
494
495 if (!$have_slurm)
496 {
497   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
499 }
500
501 # If this job requires a Docker image, install that.
502 my $docker_bin = "/usr/bin/docker.io";
503 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
504
505 foreach (qw (script script_version script_parameters runtime_constraints))
506 {
507   Log (undef,
508        "$_ " .
509        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
510 }
511 foreach (split (/\n/, $Job->{knobs}))
512 {
513   Log (undef, "knob " . $_);
514 }
515
516
517
518 $main::success = undef;
519
520
521
522 ONELEVEL:
523
524 my $thisround_succeeded = 0;
525 my $thisround_failed = 0;
526 my $thisround_failed_multiple = 0;
527
528 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
529                        or $a <=> $b } @jobstep_todo;
530 my $level = $jobstep[$jobstep_todo[0]]->{level};
531 Log (undef, "start level $level");
532
533
534
535 my %proc;
536 my @freeslot = (0..$#slot);
537 my @holdslot;
538 my %reader;
539 my $progress_is_dirty = 1;
540 my $progress_stats_updated = 0;
541
542 update_progress_stats();
543
544
545
546 THISROUND:
547 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
548 {
549   my $id = $jobstep_todo[$todo_ptr];
550   my $Jobstep = $jobstep[$id];
551   if ($Jobstep->{level} != $level)
552   {
553     next;
554   }
555
556   pipe $reader{$id}, "writer" or croak ($!);
557   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
558   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
559
560   my $childslot = $freeslot[0];
561   my $childnode = $slot[$childslot]->{node};
562   my $childslotname = join (".",
563                             $slot[$childslot]->{node}->{name},
564                             $slot[$childslot]->{cpu});
565   my $childpid = fork();
566   if ($childpid == 0)
567   {
568     $SIG{'INT'} = 'DEFAULT';
569     $SIG{'QUIT'} = 'DEFAULT';
570     $SIG{'TERM'} = 'DEFAULT';
571
572     foreach (values (%reader))
573     {
574       close($_);
575     }
576     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
577     open(STDOUT,">&writer");
578     open(STDERR,">&writer");
579
580     undef $dbh;
581     undef $sth;
582
583     delete $ENV{"GNUPGHOME"};
584     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
585     $ENV{"TASK_QSEQUENCE"} = $id;
586     $ENV{"TASK_SEQUENCE"} = $level;
587     $ENV{"JOB_SCRIPT"} = $Job->{script};
588     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
589       $param =~ tr/a-z/A-Z/;
590       $ENV{"JOB_PARAMETER_$param"} = $value;
591     }
592     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
593     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
594     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
595     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
596     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
597     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
598     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
599
600     $ENV{"GZIP"} = "-n";
601
602     my @srunargs = (
603       "srun",
604       "--nodelist=".$childnode->{name},
605       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
606       "--job-name=$job_id.$id.$$",
607         );
608     my $build_script_to_send = "";
609     my $command =
610         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
611         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
612         ."&& cd $ENV{CRUNCH_TMP} ";
613     if ($build_script)
614     {
615       $build_script_to_send = $build_script;
616       $command .=
617           "&& perl -";
618     }
619     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
620     if ($docker_image)
621     {
622       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
623       # Dynamically configure the container to use the host system as its
624       # DNS server.  Get the host's global addresses from the ip command,
625       # and turn them into docker --dns options using gawk.
626       $command .=
627           q{$(ip -o address show scope global |
628               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
629       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
630       {
631         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
632       }
633       while (my ($env_key, $env_val) = each %ENV)
634       {
635         $command .= "-e \Q$env_key=$env_val\E ";
636       }
637       $command .= "$docker_image ";
638     }
639     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
640     my @execargs = ('bash', '-c', $command);
641     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
642     exit (111);
643   }
644   close("writer");
645   if (!defined $childpid)
646   {
647     close $reader{$id};
648     delete $reader{$id};
649     next;
650   }
651   shift @freeslot;
652   $proc{$childpid} = { jobstep => $id,
653                        time => time,
654                        slot => $childslot,
655                        jobstepname => "$job_id.$id.$childpid",
656                      };
657   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
658   $slot[$childslot]->{pid} = $childpid;
659
660   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
661   Log ($id, "child $childpid started on $childslotname");
662   $Jobstep->{starttime} = time;
663   $Jobstep->{node} = $childnode->{name};
664   $Jobstep->{slotindex} = $childslot;
665   delete $Jobstep->{stderr};
666   delete $Jobstep->{finishtime};
667
668   splice @jobstep_todo, $todo_ptr, 1;
669   --$todo_ptr;
670
671   $progress_is_dirty = 1;
672
673   while (!@freeslot
674          ||
675          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
676   {
677     last THISROUND if $main::please_freeze;
678     if ($main::please_info)
679     {
680       $main::please_info = 0;
681       freeze();
682       collate_output();
683       save_meta(1);
684       update_progress_stats();
685     }
686     my $gotsome
687         = readfrompipes ()
688         + reapchildren ();
689     if (!$gotsome)
690     {
691       check_refresh_wanted();
692       check_squeue();
693       update_progress_stats();
694       select (undef, undef, undef, 0.1);
695     }
696     elsif (time - $progress_stats_updated >= 30)
697     {
698       update_progress_stats();
699     }
700     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
701         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
702     {
703       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
704           .($thisround_failed+$thisround_succeeded)
705           .") -- giving up on this round";
706       Log (undef, $message);
707       last THISROUND;
708     }
709
710     # move slots from freeslot to holdslot (or back to freeslot) if necessary
711     for (my $i=$#freeslot; $i>=0; $i--) {
712       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
713         push @holdslot, (splice @freeslot, $i, 1);
714       }
715     }
716     for (my $i=$#holdslot; $i>=0; $i--) {
717       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
718         push @freeslot, (splice @holdslot, $i, 1);
719       }
720     }
721
722     # give up if no nodes are succeeding
723     if (!grep { $_->{node}->{losing_streak} == 0 &&
724                     $_->{node}->{hold_count} < 4 } @slot) {
725       my $message = "Every node has failed -- giving up on this round";
726       Log (undef, $message);
727       last THISROUND;
728     }
729   }
730 }
731
732
733 push @freeslot, splice @holdslot;
734 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
735
736
737 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
738 while (%proc)
739 {
740   if ($main::please_continue) {
741     $main::please_continue = 0;
742     goto THISROUND;
743   }
744   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
745   readfrompipes ();
746   if (!reapchildren())
747   {
748     check_refresh_wanted();
749     check_squeue();
750     update_progress_stats();
751     select (undef, undef, undef, 0.1);
752     killem (keys %proc) if $main::please_freeze;
753   }
754 }
755
756 update_progress_stats();
757 freeze_if_want_freeze();
758
759
760 if (!defined $main::success)
761 {
762   if (@jobstep_todo &&
763       $thisround_succeeded == 0 &&
764       ($thisround_failed == 0 || $thisround_failed > 4))
765   {
766     my $message = "stop because $thisround_failed tasks failed and none succeeded";
767     Log (undef, $message);
768     $main::success = 0;
769   }
770   if (!@jobstep_todo)
771   {
772     $main::success = 1;
773   }
774 }
775
776 goto ONELEVEL if !defined $main::success;
777
778
779 release_allocation();
780 freeze();
781 if ($job_has_uuid) {
782   $Job->update_attributes('output' => &collate_output(),
783                           'running' => 0,
784                           'success' => $Job->{'output'} && $main::success,
785                           'finished_at' => scalar gmtime)
786 }
787
788 if ($Job->{'output'})
789 {
790   eval {
791     my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
792     $arv->{'collections'}->{'create'}->execute('collection' => {
793       'uuid' => $Job->{'output'},
794       'manifest_text' => $manifest_text,
795     });
796     if ($Job->{'output_is_persistent'}) {
797       $arv->{'links'}->{'create'}->execute('link' => {
798         'tail_kind' => 'arvados#user',
799         'tail_uuid' => $User->{'uuid'},
800         'head_kind' => 'arvados#collection',
801         'head_uuid' => $Job->{'output'},
802         'link_class' => 'resources',
803         'name' => 'wants',
804       });
805     }
806   };
807   if ($@) {
808     Log (undef, "Failed to register output manifest: $@");
809   }
810 }
811
812 Log (undef, "finish");
813
814 save_meta();
815 exit 0;
816
817
818
819 sub update_progress_stats
820 {
821   $progress_stats_updated = time;
822   return if !$progress_is_dirty;
823   my ($todo, $done, $running) = (scalar @jobstep_todo,
824                                  scalar @jobstep_done,
825                                  scalar @slot - scalar @freeslot - scalar @holdslot);
826   $Job->{'tasks_summary'} ||= {};
827   $Job->{'tasks_summary'}->{'todo'} = $todo;
828   $Job->{'tasks_summary'}->{'done'} = $done;
829   $Job->{'tasks_summary'}->{'running'} = $running;
830   if ($job_has_uuid) {
831     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
832   }
833   Log (undef, "status: $done done, $running running, $todo todo");
834   $progress_is_dirty = 0;
835 }
836
837
838
839 sub reapchildren
840 {
841   my $pid = waitpid (-1, WNOHANG);
842   return 0 if $pid <= 0;
843
844   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
845                   . "."
846                   . $slot[$proc{$pid}->{slot}]->{cpu});
847   my $jobstepid = $proc{$pid}->{jobstep};
848   my $elapsed = time - $proc{$pid}->{time};
849   my $Jobstep = $jobstep[$jobstepid];
850
851   my $childstatus = $?;
852   my $exitvalue = $childstatus >> 8;
853   my $exitinfo = sprintf("exit %d signal %d%s",
854                          $exitvalue,
855                          $childstatus & 127,
856                          ($childstatus & 128 ? ' core dump' : ''));
857   $Jobstep->{'arvados_task'}->reload;
858   my $task_success = $Jobstep->{'arvados_task'}->{success};
859
860   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
861
862   if (!defined $task_success) {
863     # task did not indicate one way or the other --> fail
864     $Jobstep->{'arvados_task'}->{success} = 0;
865     $Jobstep->{'arvados_task'}->save;
866     $task_success = 0;
867   }
868
869   if (!$task_success)
870   {
871     my $temporary_fail;
872     $temporary_fail ||= $Jobstep->{node_fail};
873     $temporary_fail ||= ($exitvalue == 111);
874
875     ++$thisround_failed;
876     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
877
878     # Check for signs of a failed or misconfigured node
879     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
880         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
881       # Don't count this against jobstep failure thresholds if this
882       # node is already suspected faulty and srun exited quickly
883       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
884           $elapsed < 5) {
885         Log ($jobstepid, "blaming failure on suspect node " .
886              $slot[$proc{$pid}->{slot}]->{node}->{name});
887         $temporary_fail ||= 1;
888       }
889       ban_node_by_slot($proc{$pid}->{slot});
890     }
891
892     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
893                              ++$Jobstep->{'failures'},
894                              $temporary_fail ? 'temporary ' : 'permanent',
895                              $elapsed));
896
897     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
898       # Give up on this task, and the whole job
899       $main::success = 0;
900       $main::please_freeze = 1;
901     }
902     else {
903       # Put this task back on the todo queue
904       push @jobstep_todo, $jobstepid;
905     }
906     $Job->{'tasks_summary'}->{'failed'}++;
907   }
908   else
909   {
910     ++$thisround_succeeded;
911     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
912     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
913     push @jobstep_done, $jobstepid;
914     Log ($jobstepid, "success in $elapsed seconds");
915   }
916   $Jobstep->{exitcode} = $childstatus;
917   $Jobstep->{finishtime} = time;
918   process_stderr ($jobstepid, $task_success);
919   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
920
921   close $reader{$jobstepid};
922   delete $reader{$jobstepid};
923   delete $slot[$proc{$pid}->{slot}]->{pid};
924   push @freeslot, $proc{$pid}->{slot};
925   delete $proc{$pid};
926
927   # Load new tasks
928   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
929     'where' => {
930       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
931     },
932     'order' => 'qsequence'
933   );
934   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
935     my $jobstep = {
936       'level' => $arvados_task->{'sequence'},
937       'failures' => 0,
938       'arvados_task' => $arvados_task
939     };
940     push @jobstep, $jobstep;
941     push @jobstep_todo, $#jobstep;
942   }
943
944   $progress_is_dirty = 1;
945   1;
946 }
947
948 sub check_refresh_wanted
949 {
950   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
951   if (@stat && $stat[9] > $latest_refresh) {
952     $latest_refresh = scalar time;
953     if ($job_has_uuid) {
954       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
955       for my $attr ('cancelled_at',
956                     'cancelled_by_user_uuid',
957                     'cancelled_by_client_uuid') {
958         $Job->{$attr} = $Job2->{$attr};
959       }
960       if ($Job->{'cancelled_at'}) {
961         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
962              " by user " . $Job->{cancelled_by_user_uuid});
963         $main::success = 0;
964         $main::please_freeze = 1;
965       }
966     }
967   }
968 }
969
970 sub check_squeue
971 {
972   # return if the kill list was checked <4 seconds ago
973   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
974   {
975     return;
976   }
977   $squeue_kill_checked = time;
978
979   # use killem() on procs whose killtime is reached
980   for (keys %proc)
981   {
982     if (exists $proc{$_}->{killtime}
983         && $proc{$_}->{killtime} <= time)
984     {
985       killem ($_);
986     }
987   }
988
989   # return if the squeue was checked <60 seconds ago
990   if (defined $squeue_checked && $squeue_checked > time - 60)
991   {
992     return;
993   }
994   $squeue_checked = time;
995
996   if (!$have_slurm)
997   {
998     # here is an opportunity to check for mysterious problems with local procs
999     return;
1000   }
1001
1002   # get a list of steps still running
1003   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1004   chop @squeue;
1005   if ($squeue[-1] ne "ok")
1006   {
1007     return;
1008   }
1009   pop @squeue;
1010
1011   # which of my jobsteps are running, according to squeue?
1012   my %ok;
1013   foreach (@squeue)
1014   {
1015     if (/^(\d+)\.(\d+) (\S+)/)
1016     {
1017       if ($1 eq $ENV{SLURM_JOBID})
1018       {
1019         $ok{$3} = 1;
1020       }
1021     }
1022   }
1023
1024   # which of my active child procs (>60s old) were not mentioned by squeue?
1025   foreach (keys %proc)
1026   {
1027     if ($proc{$_}->{time} < time - 60
1028         && !exists $ok{$proc{$_}->{jobstepname}}
1029         && !exists $proc{$_}->{killtime})
1030     {
1031       # kill this proc if it hasn't exited in 30 seconds
1032       $proc{$_}->{killtime} = time + 30;
1033     }
1034   }
1035 }
1036
1037
1038 sub release_allocation
1039 {
1040   if ($have_slurm)
1041   {
1042     Log (undef, "release job allocation");
1043     system "scancel $ENV{SLURM_JOBID}";
1044   }
1045 }
1046
1047
1048 sub readfrompipes
1049 {
1050   my $gotsome = 0;
1051   foreach my $job (keys %reader)
1052   {
1053     my $buf;
1054     while (0 < sysread ($reader{$job}, $buf, 8192))
1055     {
1056       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1057       $jobstep[$job]->{stderr} .= $buf;
1058       preprocess_stderr ($job);
1059       if (length ($jobstep[$job]->{stderr}) > 16384)
1060       {
1061         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1062       }
1063       $gotsome = 1;
1064     }
1065   }
1066   return $gotsome;
1067 }
1068
1069
1070 sub preprocess_stderr
1071 {
1072   my $job = shift;
1073
1074   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1075     my $line = $1;
1076     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1077     Log ($job, "stderr $line");
1078     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1079       # whoa.
1080       $main::please_freeze = 1;
1081     }
1082     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1083       $jobstep[$job]->{node_fail} = 1;
1084       ban_node_by_slot($jobstep[$job]->{slotindex});
1085     }
1086   }
1087 }
1088
1089
1090 sub process_stderr
1091 {
1092   my $job = shift;
1093   my $task_success = shift;
1094   preprocess_stderr ($job);
1095
1096   map {
1097     Log ($job, "stderr $_");
1098   } split ("\n", $jobstep[$job]->{stderr});
1099 }
1100
1101 sub fetch_block
1102 {
1103   my $hash = shift;
1104   my ($keep, $child_out, $output_block);
1105
1106   my $cmd = "$arv_cli keep get \Q$hash\E";
1107   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1108   sysread($keep, $output_block, 64 * 1024 * 1024);
1109   close $keep;
1110   return $output_block;
1111 }
1112
1113 sub collate_output
1114 {
1115   Log (undef, "collate");
1116
1117   my ($child_out, $child_in);
1118   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1119   my $joboutput;
1120   for (@jobstep)
1121   {
1122     next if (!exists $_->{'arvados_task'}->{output} ||
1123              !$_->{'arvados_task'}->{'success'} ||
1124              $_->{'exitcode'} != 0);
1125     my $output = $_->{'arvados_task'}->{output};
1126     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1127     {
1128       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1129       print $child_in $output;
1130     }
1131     elsif (@jobstep == 1)
1132     {
1133       $joboutput = $output;
1134       last;
1135     }
1136     elsif (defined (my $outblock = fetch_block ($output)))
1137     {
1138       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1139       print $child_in $outblock;
1140     }
1141     else
1142     {
1143       Log (undef, "XXX fetch_block($output) failed XXX");
1144       $main::success = 0;
1145     }
1146   }
1147   $child_in->close;
1148
1149   if (!defined $joboutput) {
1150     my $s = IO::Select->new($child_out);
1151     if ($s->can_read(120)) {
1152       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1153       chomp($joboutput);
1154     } else {
1155       Log (undef, "timed out reading from 'arv keep put'");
1156     }
1157   }
1158   waitpid($pid, 0);
1159
1160   if ($joboutput)
1161   {
1162     Log (undef, "output $joboutput");
1163     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1164   }
1165   else
1166   {
1167     Log (undef, "output undef");
1168   }
1169   return $joboutput;
1170 }
1171
1172
1173 sub killem
1174 {
1175   foreach (@_)
1176   {
1177     my $sig = 2;                # SIGINT first
1178     if (exists $proc{$_}->{"sent_$sig"} &&
1179         time - $proc{$_}->{"sent_$sig"} > 4)
1180     {
1181       $sig = 15;                # SIGTERM if SIGINT doesn't work
1182     }
1183     if (exists $proc{$_}->{"sent_$sig"} &&
1184         time - $proc{$_}->{"sent_$sig"} > 4)
1185     {
1186       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1187     }
1188     if (!exists $proc{$_}->{"sent_$sig"})
1189     {
1190       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1191       kill $sig, $_;
1192       select (undef, undef, undef, 0.1);
1193       if ($sig == 2)
1194       {
1195         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1196       }
1197       $proc{$_}->{"sent_$sig"} = time;
1198       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1199     }
1200   }
1201 }
1202
1203
1204 sub fhbits
1205 {
1206   my($bits);
1207   for (@_) {
1208     vec($bits,fileno($_),1) = 1;
1209   }
1210   $bits;
1211 }
1212
1213
1214 sub Log                         # ($jobstep_id, $logmessage)
1215 {
1216   if ($_[1] =~ /\n/) {
1217     for my $line (split (/\n/, $_[1])) {
1218       Log ($_[0], $line);
1219     }
1220     return;
1221   }
1222   my $fh = select STDERR; $|=1; select $fh;
1223   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1224   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1225   $message .= "\n";
1226   my $datetime;
1227   if ($metastream || -t STDERR) {
1228     my @gmtime = gmtime;
1229     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1230                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1231   }
1232   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1233
1234   if ($metastream) {
1235     print $metastream $datetime . " " . $message;
1236   }
1237 }
1238
1239
1240 sub croak
1241 {
1242   my ($package, $file, $line) = caller;
1243   my $message = "@_ at $file line $line\n";
1244   Log (undef, $message);
1245   freeze() if @jobstep_todo;
1246   collate_output() if @jobstep_todo;
1247   cleanup();
1248   save_meta() if $metastream;
1249   die;
1250 }
1251
1252
1253 sub cleanup
1254 {
1255   return if !$job_has_uuid;
1256   $Job->update_attributes('running' => 0,
1257                           'success' => 0,
1258                           'finished_at' => scalar gmtime);
1259 }
1260
1261
1262 sub save_meta
1263 {
1264   my $justcheckpoint = shift; # false if this will be the last meta saved
1265   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1266
1267   $local_logfile->flush;
1268   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1269       . quotemeta($local_logfile->filename);
1270   my $loglocator = `$cmd`;
1271   die "system $cmd failed: $?" if $?;
1272
1273   $local_logfile = undef;   # the temp file is automatically deleted
1274   Log (undef, "log manifest is $loglocator");
1275   $Job->{'log'} = $loglocator;
1276   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1277 }
1278
1279
1280 sub freeze_if_want_freeze
1281 {
1282   if ($main::please_freeze)
1283   {
1284     release_allocation();
1285     if (@_)
1286     {
1287       # kill some srun procs before freeze+stop
1288       map { $proc{$_} = {} } @_;
1289       while (%proc)
1290       {
1291         killem (keys %proc);
1292         select (undef, undef, undef, 0.1);
1293         my $died;
1294         while (($died = waitpid (-1, WNOHANG)) > 0)
1295         {
1296           delete $proc{$died};
1297         }
1298       }
1299     }
1300     freeze();
1301     collate_output();
1302     cleanup();
1303     save_meta();
1304     exit 0;
1305   }
1306 }
1307
1308
1309 sub freeze
1310 {
1311   Log (undef, "Freeze not implemented");
1312   return;
1313 }
1314
1315
1316 sub thaw
1317 {
1318   croak ("Thaw not implemented");
1319 }
1320
1321
1322 sub freezequote
1323 {
1324   my $s = shift;
1325   $s =~ s/\\/\\\\/g;
1326   $s =~ s/\n/\\n/g;
1327   return $s;
1328 }
1329
1330
1331 sub freezeunquote
1332 {
1333   my $s = shift;
1334   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1335   return $s;
1336 }
1337
1338
1339 sub srun
1340 {
1341   my $srunargs = shift;
1342   my $execargs = shift;
1343   my $opts = shift || {};
1344   my $stdin = shift;
1345   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1346   print STDERR (join (" ",
1347                       map { / / ? "'$_'" : $_ }
1348                       (@$args)),
1349                 "\n")
1350       if $ENV{CRUNCH_DEBUG};
1351
1352   if (defined $stdin) {
1353     my $child = open STDIN, "-|";
1354     defined $child or die "no fork: $!";
1355     if ($child == 0) {
1356       print $stdin or die $!;
1357       close STDOUT or die $!;
1358       exit 0;
1359     }
1360   }
1361
1362   return system (@$args) if $opts->{fork};
1363
1364   exec @$args;
1365   warn "ENV size is ".length(join(" ",%ENV));
1366   die "exec failed: $!: @$args";
1367 }
1368
1369
1370 sub ban_node_by_slot {
1371   # Don't start any new jobsteps on this node for 60 seconds
1372   my $slotid = shift;
1373   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1374   $slot[$slotid]->{node}->{hold_count}++;
1375   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1376 }
1377
1378 sub must_lock_now
1379 {
1380   my ($lockfile, $error_message) = @_;
1381   open L, ">", $lockfile or croak("$lockfile: $!");
1382   if (!flock L, LOCK_EX|LOCK_NB) {
1383     croak("Can't lock $lockfile: $error_message\n");
1384   }
1385 }
1386
1387 __DATA__
1388 #!/usr/bin/perl
1389
1390 # checkout-and-build
1391
1392 use Fcntl ':flock';
1393
1394 my $destdir = $ENV{"CRUNCH_SRC"};
1395 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1396 my $repo = $ENV{"CRUNCH_SRC_URL"};
1397
1398 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1399 flock L, LOCK_EX;
1400 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1401     exit 0;
1402 }
1403
1404 unlink "$destdir.commit";
1405 open STDOUT, ">", "$destdir.log";
1406 open STDERR, ">&STDOUT";
1407
1408 mkdir $destdir;
1409 my @git_archive_data = <DATA>;
1410 if (@git_archive_data) {
1411   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1412   print TARX @git_archive_data;
1413   if(!close(TARX)) {
1414     die "'tar -C $destdir -xf -' exited $?: $!";
1415   }
1416 }
1417
1418 my $pwd;
1419 chomp ($pwd = `pwd`);
1420 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1421 mkdir $install_dir;
1422
1423 for my $src_path ("$destdir/arvados/sdk/python") {
1424   if (-d $src_path) {
1425     shell_or_die ("virtualenv", $install_dir);
1426     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1427   }
1428 }
1429
1430 if (-e "$destdir/crunch_scripts/install") {
1431     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1432 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1433     # Old version
1434     shell_or_die ("./tests/autotests.sh", $install_dir);
1435 } elsif (-e "./install.sh") {
1436     shell_or_die ("./install.sh", $install_dir);
1437 }
1438
1439 if ($commit) {
1440     unlink "$destdir.commit.new";
1441     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1442     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1443 }
1444
1445 close L;
1446
1447 exit 0;
1448
1449 sub shell_or_die
1450 {
1451   if ($ENV{"DEBUG"}) {
1452     print STDERR "@_\n";
1453   }
1454   system (@_) == 0
1455       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1456 }
1457
1458 __DATA__