Merge branch '2352-use-state'
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $arv_cli;
99
100 if (defined $ENV{"ARV_CLI"}) {
101   $arv_cli = $ENV{"ARV_CLI"};
102 }
103 else {
104   $arv_cli = 'arv';
105 }
106
107 my $force_unlock;
108 my $git_dir;
109 my $jobspec;
110 my $job_api_token;
111 my $no_clear_tmp;
112 my $resume_stash;
113 GetOptions('force-unlock' => \$force_unlock,
114            'git-dir=s' => \$git_dir,
115            'job=s' => \$jobspec,
116            'job-api-token=s' => \$job_api_token,
117            'no-clear-tmp' => \$no_clear_tmp,
118            'resume-stash=s' => \$resume_stash,
119     );
120
121 if (defined $job_api_token) {
122   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
123 }
124
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
128
129
130 $SIG{'USR1'} = sub
131 {
132   $main::ENV{CRUNCH_DEBUG} = 1;
133 };
134 $SIG{'USR2'} = sub
135 {
136   $main::ENV{CRUNCH_DEBUG} = 0;
137 };
138
139
140
141 my $arv = Arvados->new('apiVersion' => 'v1');
142 my $metastream;
143
144 my $User = $arv->{'users'}->{'current'}->execute;
145
146 my $Job = {};
147 my $job_id;
148 my $dbh;
149 my $sth;
150 if ($job_has_uuid)
151 {
152   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153   if (!$force_unlock) {
154     if ($Job->{'is_locked_by_uuid'}) {
155       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
156     }
157     if ($Job->{'success'} ne undef) {
158       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
159     }
160     if ($Job->{'running'}) {
161       croak("Job 'running' flag is already set");
162     }
163     if ($Job->{'started_at'}) {
164       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
165     }
166   }
167 }
168 else
169 {
170   $Job = JSON::decode_json($jobspec);
171
172   if (!$resume_stash)
173   {
174     map { croak ("No $_ specified") unless $Job->{$_} }
175     qw(script script_version script_parameters);
176   }
177
178   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179   $Job->{'started_at'} = gmtime;
180
181   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
182
183   $job_has_uuid = 1;
184 }
185 $job_id = $Job->{'uuid'};
186
187 my $keep_logfile = $job_id . '.log.txt';
188 my $local_logfile = File::Temp->new();
189
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
193
194
195 Log (undef, "check slurm allocation");
196 my @slot;
197 my @node;
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
199 my @sinfo;
200 if (!$have_slurm)
201 {
202   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203   push @sinfo, "$localcpus localhost";
204 }
205 if (exists $ENV{SLURM_NODELIST})
206 {
207   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
208 }
209 foreach (@sinfo)
210 {
211   my ($ncpus, $slurm_nodelist) = split;
212   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
213
214   my @nodelist;
215   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
216   {
217     my $nodelist = $1;
218     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
219     {
220       my $ranges = $1;
221       foreach (split (",", $ranges))
222       {
223         my ($a, $b);
224         if (/(\d+)-(\d+)/)
225         {
226           $a = $1;
227           $b = $2;
228         }
229         else
230         {
231           $a = $_;
232           $b = $_;
233         }
234         push @nodelist, map {
235           my $n = $nodelist;
236           $n =~ s/\[[-,\d]+\]/$_/;
237           $n;
238         } ($a..$b);
239       }
240     }
241     else
242     {
243       push @nodelist, $nodelist;
244     }
245   }
246   foreach my $nodename (@nodelist)
247   {
248     Log (undef, "node $nodename - $ncpus slots");
249     my $node = { name => $nodename,
250                  ncpus => $ncpus,
251                  losing_streak => 0,
252                  hold_until => 0 };
253     foreach my $cpu (1..$ncpus)
254     {
255       push @slot, { node => $node,
256                     cpu => $cpu };
257     }
258   }
259   push @node, @nodelist;
260 }
261
262
263
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
266
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268
269
270
271 my $jobmanager_id;
272 if ($job_has_uuid)
273 {
274   # Claim this job, and make sure nobody else does
275   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277     croak("Error while updating / locking job");
278   }
279   $Job->update_attributes('started_at' => scalar gmtime,
280                           'running' => 1,
281                           'success' => undef,
282                           'tasks_summary' => { 'failed' => 0,
283                                                'todo' => 1,
284                                                'running' => 0,
285                                                'done' => 0 });
286 }
287
288
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
297
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
303
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
307
308
309 my @jobstep;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
314 my $squeue_checked;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
318
319
320
321 if (defined $Job->{thawedfromkey})
322 {
323   thaw ($Job->{thawedfromkey});
324 }
325 else
326 {
327   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328     'job_uuid' => $Job->{'uuid'},
329     'sequence' => 0,
330     'qsequence' => 0,
331     'parameters' => {},
332                                                           });
333   push @jobstep, { 'level' => 0,
334                    'failures' => 0,
335                    'arvados_task' => $first_task,
336                  };
337   push @jobstep_todo, 0;
338 }
339
340
341 if (!$have_slurm)
342 {
343   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
344 }
345
346
347 my $build_script;
348
349
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
351
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
353 if ($skip_install)
354 {
355   if (!defined $no_clear_tmp) {
356     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357     system($clear_tmp_cmd) == 0
358         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
359   }
360   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
362     if (-d $src_path) {
363       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
366           == 0
367           or croak ("setup.py in $src_path failed: exit ".($?>>8));
368     }
369   }
370 }
371 else
372 {
373   do {
374     local $/ = undef;
375     $build_script = <DATA>;
376   };
377   Log (undef, "Install revision ".$Job->{script_version});
378   my $nodelist = join(",", @node);
379
380   if (!defined $no_clear_tmp) {
381     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382
383     my $cleanpid = fork();
384     if ($cleanpid == 0)
385     {
386       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
388       exit (1);
389     }
390     while (1)
391     {
392       last if $cleanpid == waitpid (-1, WNOHANG);
393       freeze_if_want_freeze ($cleanpid);
394       select (undef, undef, undef, 0.1);
395     }
396     Log (undef, "Clean-work-dir exited $?");
397   }
398
399   # Install requested code version
400
401   my @execargs;
402   my @srunargs = ("srun",
403                   "--nodelist=$nodelist",
404                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
405
406   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
408
409   my $commit;
410   my $git_archive;
411   my $treeish = $Job->{'script_version'};
412
413   # If we're running under crunch-dispatch, it will have pulled the
414   # appropriate source tree into its own repository, and given us that
415   # repo's path as $git_dir. If we're running a "local" job, and a
416   # script_version was specified, it's up to the user to provide the
417   # full path to a local repository in Job->{repository}.
418   #
419   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420   # git-archive --remote where appropriate.
421   #
422   # TODO: Accept a locally-hosted Arvados repository by name or
423   # UUID. Use arvados.v1.repositories.list or .get to figure out the
424   # appropriate fetch-url.
425   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
426
427   $ENV{"CRUNCH_SRC_URL"} = $repo;
428
429   if (-d "$repo/.git") {
430     # We were given a working directory, but we are only interested in
431     # the index.
432     $repo = "$repo/.git";
433   }
434
435   # If this looks like a subversion r#, look for it in git-svn commit messages
436
437   if ($treeish =~ m{^\d{1,4}$}) {
438     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
439     chomp $gitlog;
440     if ($gitlog =~ /^[a-f0-9]{40}$/) {
441       $commit = $gitlog;
442       Log (undef, "Using commit $commit for script_version $treeish");
443     }
444   }
445
446   # If that didn't work, try asking git to look it up as a tree-ish.
447
448   if (!defined $commit) {
449     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
450     chomp $found;
451     if ($found =~ /^[0-9a-f]{40}$/s) {
452       $commit = $found;
453       if ($commit ne $treeish) {
454         # Make sure we record the real commit id in the database,
455         # frozentokey, logs, etc. -- instead of an abbreviation or a
456         # branch name which can become ambiguous or point to a
457         # different commit in the future.
458         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459         Log (undef, "Using commit $commit for tree-ish $treeish");
460         if ($commit ne $treeish) {
461           $Job->{'script_version'} = $commit;
462           !$job_has_uuid or
463               $Job->update_attributes('script_version' => $commit) or
464               croak("Error while updating job");
465         }
466       }
467     }
468   }
469
470   if (defined $commit) {
471     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472     @execargs = ("sh", "-c",
473                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
475   }
476   else {
477     croak ("could not figure out commit id for $treeish");
478   }
479
480   my $installpid = fork();
481   if ($installpid == 0)
482   {
483     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
484     exit (1);
485   }
486   while (1)
487   {
488     last if $installpid == waitpid (-1, WNOHANG);
489     freeze_if_want_freeze ($installpid);
490     select (undef, undef, undef, 0.1);
491   }
492   Log (undef, "Install exited $?");
493 }
494
495 if (!$have_slurm)
496 {
497   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
499 }
500
501 # If this job requires a Docker image, install that.
502 my $docker_bin = "/usr/bin/docker.io";
503 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
504 if ($docker_image) {
505   my $docker_pid = fork();
506   if ($docker_pid == 0)
507   {
508     srun (["srun", "--nodelist=" . join(' ', @node)],
509           [$docker_bin, 'pull', $docker_image]);
510     exit ($?);
511   }
512   while (1)
513   {
514     last if $docker_pid == waitpid (-1, WNOHANG);
515     freeze_if_want_freeze ($docker_pid);
516     select (undef, undef, undef, 0.1);
517   }
518   # If the Docker image was specified as a hash, pull will fail.
519   # Ignore that error.  We'll see what happens when we try to run later.
520   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
521   {
522     croak("Installing Docker image $docker_image returned exit code $?");
523   }
524 }
525
526 foreach (qw (script script_version script_parameters runtime_constraints))
527 {
528   Log (undef,
529        "$_ " .
530        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
531 }
532 foreach (split (/\n/, $Job->{knobs}))
533 {
534   Log (undef, "knob " . $_);
535 }
536
537
538
539 $main::success = undef;
540
541
542
543 ONELEVEL:
544
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
548
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550                        or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
553
554
555
556 my %proc;
557 my @freeslot = (0..$#slot);
558 my @holdslot;
559 my %reader;
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
562
563 update_progress_stats();
564
565
566
567 THISROUND:
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
569 {
570   my $id = $jobstep_todo[$todo_ptr];
571   my $Jobstep = $jobstep[$id];
572   if ($Jobstep->{level} != $level)
573   {
574     next;
575   }
576
577   pipe $reader{$id}, "writer" or croak ($!);
578   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
580
581   my $childslot = $freeslot[0];
582   my $childnode = $slot[$childslot]->{node};
583   my $childslotname = join (".",
584                             $slot[$childslot]->{node}->{name},
585                             $slot[$childslot]->{cpu});
586   my $childpid = fork();
587   if ($childpid == 0)
588   {
589     $SIG{'INT'} = 'DEFAULT';
590     $SIG{'QUIT'} = 'DEFAULT';
591     $SIG{'TERM'} = 'DEFAULT';
592
593     foreach (values (%reader))
594     {
595       close($_);
596     }
597     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598     open(STDOUT,">&writer");
599     open(STDERR,">&writer");
600
601     undef $dbh;
602     undef $sth;
603
604     delete $ENV{"GNUPGHOME"};
605     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606     $ENV{"TASK_QSEQUENCE"} = $id;
607     $ENV{"TASK_SEQUENCE"} = $level;
608     $ENV{"JOB_SCRIPT"} = $Job->{script};
609     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610       $param =~ tr/a-z/A-Z/;
611       $ENV{"JOB_PARAMETER_$param"} = $value;
612     }
613     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
617     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
618     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
619     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
620
621     $ENV{"GZIP"} = "-n";
622
623     my @srunargs = (
624       "srun",
625       "--nodelist=".$childnode->{name},
626       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
627       "--job-name=$job_id.$id.$$",
628         );
629     my $build_script_to_send = "";
630     my $command =
631         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
632         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
633         ."&& cd $ENV{CRUNCH_TMP} ";
634     if ($build_script)
635     {
636       $build_script_to_send = $build_script;
637       $command .=
638           "&& perl -";
639     }
640     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
641     if ($docker_image)
642     {
643       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
644       # Dynamically configure the container to use the host system as its
645       # DNS server.  Get the host's global addresses from the ip command,
646       # and turn them into docker --dns options using gawk.
647       $command .=
648           q{$(ip -o address show scope global |
649               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
650       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
651       {
652         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
653       }
654       while (my ($env_key, $env_val) = each %ENV)
655       {
656         $command .= "-e \Q$env_key=$env_val\E ";
657       }
658       $command .= "\Q$docker_image\E ";
659     }
660     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
661     my @execargs = ('bash', '-c', $command);
662     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
663     exit (111);
664   }
665   close("writer");
666   if (!defined $childpid)
667   {
668     close $reader{$id};
669     delete $reader{$id};
670     next;
671   }
672   shift @freeslot;
673   $proc{$childpid} = { jobstep => $id,
674                        time => time,
675                        slot => $childslot,
676                        jobstepname => "$job_id.$id.$childpid",
677                      };
678   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
679   $slot[$childslot]->{pid} = $childpid;
680
681   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
682   Log ($id, "child $childpid started on $childslotname");
683   $Jobstep->{starttime} = time;
684   $Jobstep->{node} = $childnode->{name};
685   $Jobstep->{slotindex} = $childslot;
686   delete $Jobstep->{stderr};
687   delete $Jobstep->{finishtime};
688
689   splice @jobstep_todo, $todo_ptr, 1;
690   --$todo_ptr;
691
692   $progress_is_dirty = 1;
693
694   while (!@freeslot
695          ||
696          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
697   {
698     last THISROUND if $main::please_freeze;
699     if ($main::please_info)
700     {
701       $main::please_info = 0;
702       freeze();
703       collate_output();
704       save_meta(1);
705       update_progress_stats();
706     }
707     my $gotsome
708         = readfrompipes ()
709         + reapchildren ();
710     if (!$gotsome)
711     {
712       check_refresh_wanted();
713       check_squeue();
714       update_progress_stats();
715       select (undef, undef, undef, 0.1);
716     }
717     elsif (time - $progress_stats_updated >= 30)
718     {
719       update_progress_stats();
720     }
721     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
722         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
723     {
724       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
725           .($thisround_failed+$thisround_succeeded)
726           .") -- giving up on this round";
727       Log (undef, $message);
728       last THISROUND;
729     }
730
731     # move slots from freeslot to holdslot (or back to freeslot) if necessary
732     for (my $i=$#freeslot; $i>=0; $i--) {
733       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
734         push @holdslot, (splice @freeslot, $i, 1);
735       }
736     }
737     for (my $i=$#holdslot; $i>=0; $i--) {
738       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
739         push @freeslot, (splice @holdslot, $i, 1);
740       }
741     }
742
743     # give up if no nodes are succeeding
744     if (!grep { $_->{node}->{losing_streak} == 0 &&
745                     $_->{node}->{hold_count} < 4 } @slot) {
746       my $message = "Every node has failed -- giving up on this round";
747       Log (undef, $message);
748       last THISROUND;
749     }
750   }
751 }
752
753
754 push @freeslot, splice @holdslot;
755 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
756
757
758 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
759 while (%proc)
760 {
761   if ($main::please_continue) {
762     $main::please_continue = 0;
763     goto THISROUND;
764   }
765   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
766   readfrompipes ();
767   if (!reapchildren())
768   {
769     check_refresh_wanted();
770     check_squeue();
771     update_progress_stats();
772     select (undef, undef, undef, 0.1);
773     killem (keys %proc) if $main::please_freeze;
774   }
775 }
776
777 update_progress_stats();
778 freeze_if_want_freeze();
779
780
781 if (!defined $main::success)
782 {
783   if (@jobstep_todo &&
784       $thisround_succeeded == 0 &&
785       ($thisround_failed == 0 || $thisround_failed > 4))
786   {
787     my $message = "stop because $thisround_failed tasks failed and none succeeded";
788     Log (undef, $message);
789     $main::success = 0;
790   }
791   if (!@jobstep_todo)
792   {
793     $main::success = 1;
794   }
795 }
796
797 goto ONELEVEL if !defined $main::success;
798
799
800 release_allocation();
801 freeze();
802 if ($job_has_uuid) {
803   $Job->update_attributes('output' => &collate_output(),
804                           'running' => 0,
805                           'success' => $Job->{'output'} && $main::success,
806                           'finished_at' => scalar gmtime)
807 }
808
809 if ($Job->{'output'})
810 {
811   eval {
812     my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
813     $arv->{'collections'}->{'create'}->execute('collection' => {
814       'uuid' => $Job->{'output'},
815       'manifest_text' => $manifest_text,
816     });
817     if ($Job->{'output_is_persistent'}) {
818       $arv->{'links'}->{'create'}->execute('link' => {
819         'tail_kind' => 'arvados#user',
820         'tail_uuid' => $User->{'uuid'},
821         'head_kind' => 'arvados#collection',
822         'head_uuid' => $Job->{'output'},
823         'link_class' => 'resources',
824         'name' => 'wants',
825       });
826     }
827   };
828   if ($@) {
829     Log (undef, "Failed to register output manifest: $@");
830   }
831 }
832
833 Log (undef, "finish");
834
835 save_meta();
836 exit 0;
837
838
839
840 sub update_progress_stats
841 {
842   $progress_stats_updated = time;
843   return if !$progress_is_dirty;
844   my ($todo, $done, $running) = (scalar @jobstep_todo,
845                                  scalar @jobstep_done,
846                                  scalar @slot - scalar @freeslot - scalar @holdslot);
847   $Job->{'tasks_summary'} ||= {};
848   $Job->{'tasks_summary'}->{'todo'} = $todo;
849   $Job->{'tasks_summary'}->{'done'} = $done;
850   $Job->{'tasks_summary'}->{'running'} = $running;
851   if ($job_has_uuid) {
852     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
853   }
854   Log (undef, "status: $done done, $running running, $todo todo");
855   $progress_is_dirty = 0;
856 }
857
858
859
860 sub reapchildren
861 {
862   my $pid = waitpid (-1, WNOHANG);
863   return 0 if $pid <= 0;
864
865   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
866                   . "."
867                   . $slot[$proc{$pid}->{slot}]->{cpu});
868   my $jobstepid = $proc{$pid}->{jobstep};
869   my $elapsed = time - $proc{$pid}->{time};
870   my $Jobstep = $jobstep[$jobstepid];
871
872   my $childstatus = $?;
873   my $exitvalue = $childstatus >> 8;
874   my $exitinfo = sprintf("exit %d signal %d%s",
875                          $exitvalue,
876                          $childstatus & 127,
877                          ($childstatus & 128 ? ' core dump' : ''));
878   $Jobstep->{'arvados_task'}->reload;
879   my $task_success = $Jobstep->{'arvados_task'}->{success};
880
881   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
882
883   if (!defined $task_success) {
884     # task did not indicate one way or the other --> fail
885     $Jobstep->{'arvados_task'}->{success} = 0;
886     $Jobstep->{'arvados_task'}->save;
887     $task_success = 0;
888   }
889
890   if (!$task_success)
891   {
892     my $temporary_fail;
893     $temporary_fail ||= $Jobstep->{node_fail};
894     $temporary_fail ||= ($exitvalue == 111);
895
896     ++$thisround_failed;
897     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
898
899     # Check for signs of a failed or misconfigured node
900     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
901         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
902       # Don't count this against jobstep failure thresholds if this
903       # node is already suspected faulty and srun exited quickly
904       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
905           $elapsed < 5) {
906         Log ($jobstepid, "blaming failure on suspect node " .
907              $slot[$proc{$pid}->{slot}]->{node}->{name});
908         $temporary_fail ||= 1;
909       }
910       ban_node_by_slot($proc{$pid}->{slot});
911     }
912
913     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
914                              ++$Jobstep->{'failures'},
915                              $temporary_fail ? 'temporary ' : 'permanent',
916                              $elapsed));
917
918     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
919       # Give up on this task, and the whole job
920       $main::success = 0;
921       $main::please_freeze = 1;
922     }
923     else {
924       # Put this task back on the todo queue
925       push @jobstep_todo, $jobstepid;
926     }
927     $Job->{'tasks_summary'}->{'failed'}++;
928   }
929   else
930   {
931     ++$thisround_succeeded;
932     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
933     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
934     push @jobstep_done, $jobstepid;
935     Log ($jobstepid, "success in $elapsed seconds");
936   }
937   $Jobstep->{exitcode} = $childstatus;
938   $Jobstep->{finishtime} = time;
939   process_stderr ($jobstepid, $task_success);
940   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
941
942   close $reader{$jobstepid};
943   delete $reader{$jobstepid};
944   delete $slot[$proc{$pid}->{slot}]->{pid};
945   push @freeslot, $proc{$pid}->{slot};
946   delete $proc{$pid};
947
948   # Load new tasks
949   my $newtask_list = [];
950   my $newtask_results;
951   do {
952     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
953       'where' => {
954         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
955       },
956       'order' => 'qsequence',
957       'offset' => scalar(@$newtask_list),
958     );
959     push(@$newtask_list, @{$newtask_results->{items}});
960   } while (@{$newtask_results->{items}});
961   foreach my $arvados_task (@$newtask_list) {
962     my $jobstep = {
963       'level' => $arvados_task->{'sequence'},
964       'failures' => 0,
965       'arvados_task' => $arvados_task
966     };
967     push @jobstep, $jobstep;
968     push @jobstep_todo, $#jobstep;
969   }
970
971   $progress_is_dirty = 1;
972   1;
973 }
974
975 sub check_refresh_wanted
976 {
977   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
978   if (@stat && $stat[9] > $latest_refresh) {
979     $latest_refresh = scalar time;
980     if ($job_has_uuid) {
981       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
982       for my $attr ('cancelled_at',
983                     'cancelled_by_user_uuid',
984                     'cancelled_by_client_uuid') {
985         $Job->{$attr} = $Job2->{$attr};
986       }
987       if ($Job->{'cancelled_at'}) {
988         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
989              " by user " . $Job->{cancelled_by_user_uuid});
990         $main::success = 0;
991         $main::please_freeze = 1;
992       }
993     }
994   }
995 }
996
997 sub check_squeue
998 {
999   # return if the kill list was checked <4 seconds ago
1000   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1001   {
1002     return;
1003   }
1004   $squeue_kill_checked = time;
1005
1006   # use killem() on procs whose killtime is reached
1007   for (keys %proc)
1008   {
1009     if (exists $proc{$_}->{killtime}
1010         && $proc{$_}->{killtime} <= time)
1011     {
1012       killem ($_);
1013     }
1014   }
1015
1016   # return if the squeue was checked <60 seconds ago
1017   if (defined $squeue_checked && $squeue_checked > time - 60)
1018   {
1019     return;
1020   }
1021   $squeue_checked = time;
1022
1023   if (!$have_slurm)
1024   {
1025     # here is an opportunity to check for mysterious problems with local procs
1026     return;
1027   }
1028
1029   # get a list of steps still running
1030   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1031   chop @squeue;
1032   if ($squeue[-1] ne "ok")
1033   {
1034     return;
1035   }
1036   pop @squeue;
1037
1038   # which of my jobsteps are running, according to squeue?
1039   my %ok;
1040   foreach (@squeue)
1041   {
1042     if (/^(\d+)\.(\d+) (\S+)/)
1043     {
1044       if ($1 eq $ENV{SLURM_JOBID})
1045       {
1046         $ok{$3} = 1;
1047       }
1048     }
1049   }
1050
1051   # which of my active child procs (>60s old) were not mentioned by squeue?
1052   foreach (keys %proc)
1053   {
1054     if ($proc{$_}->{time} < time - 60
1055         && !exists $ok{$proc{$_}->{jobstepname}}
1056         && !exists $proc{$_}->{killtime})
1057     {
1058       # kill this proc if it hasn't exited in 30 seconds
1059       $proc{$_}->{killtime} = time + 30;
1060     }
1061   }
1062 }
1063
1064
1065 sub release_allocation
1066 {
1067   if ($have_slurm)
1068   {
1069     Log (undef, "release job allocation");
1070     system "scancel $ENV{SLURM_JOBID}";
1071   }
1072 }
1073
1074
1075 sub readfrompipes
1076 {
1077   my $gotsome = 0;
1078   foreach my $job (keys %reader)
1079   {
1080     my $buf;
1081     while (0 < sysread ($reader{$job}, $buf, 8192))
1082     {
1083       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1084       $jobstep[$job]->{stderr} .= $buf;
1085       preprocess_stderr ($job);
1086       if (length ($jobstep[$job]->{stderr}) > 16384)
1087       {
1088         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1089       }
1090       $gotsome = 1;
1091     }
1092   }
1093   return $gotsome;
1094 }
1095
1096
1097 sub preprocess_stderr
1098 {
1099   my $job = shift;
1100
1101   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1102     my $line = $1;
1103     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1104     Log ($job, "stderr $line");
1105     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1106       # whoa.
1107       $main::please_freeze = 1;
1108     }
1109     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1110       $jobstep[$job]->{node_fail} = 1;
1111       ban_node_by_slot($jobstep[$job]->{slotindex});
1112     }
1113   }
1114 }
1115
1116
1117 sub process_stderr
1118 {
1119   my $job = shift;
1120   my $task_success = shift;
1121   preprocess_stderr ($job);
1122
1123   map {
1124     Log ($job, "stderr $_");
1125   } split ("\n", $jobstep[$job]->{stderr});
1126 }
1127
1128 sub fetch_block
1129 {
1130   my $hash = shift;
1131   my ($keep, $child_out, $output_block);
1132
1133   my $cmd = "$arv_cli keep get \Q$hash\E";
1134   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1135   sysread($keep, $output_block, 64 * 1024 * 1024);
1136   close $keep;
1137   return $output_block;
1138 }
1139
1140 sub collate_output
1141 {
1142   Log (undef, "collate");
1143
1144   my ($child_out, $child_in);
1145   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1146   my $joboutput;
1147   for (@jobstep)
1148   {
1149     next if (!exists $_->{'arvados_task'}->{output} ||
1150              !$_->{'arvados_task'}->{'success'} ||
1151              $_->{'exitcode'} != 0);
1152     my $output = $_->{'arvados_task'}->{output};
1153     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1154     {
1155       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1156       print $child_in $output;
1157     }
1158     elsif (@jobstep == 1)
1159     {
1160       $joboutput = $output;
1161       last;
1162     }
1163     elsif (defined (my $outblock = fetch_block ($output)))
1164     {
1165       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1166       print $child_in $outblock;
1167     }
1168     else
1169     {
1170       Log (undef, "XXX fetch_block($output) failed XXX");
1171       $main::success = 0;
1172     }
1173   }
1174   $child_in->close;
1175
1176   if (!defined $joboutput) {
1177     my $s = IO::Select->new($child_out);
1178     if ($s->can_read(120)) {
1179       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1180       chomp($joboutput);
1181     } else {
1182       Log (undef, "timed out reading from 'arv keep put'");
1183     }
1184   }
1185   waitpid($pid, 0);
1186
1187   if ($joboutput)
1188   {
1189     Log (undef, "output $joboutput");
1190     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1191   }
1192   else
1193   {
1194     Log (undef, "output undef");
1195   }
1196   return $joboutput;
1197 }
1198
1199
1200 sub killem
1201 {
1202   foreach (@_)
1203   {
1204     my $sig = 2;                # SIGINT first
1205     if (exists $proc{$_}->{"sent_$sig"} &&
1206         time - $proc{$_}->{"sent_$sig"} > 4)
1207     {
1208       $sig = 15;                # SIGTERM if SIGINT doesn't work
1209     }
1210     if (exists $proc{$_}->{"sent_$sig"} &&
1211         time - $proc{$_}->{"sent_$sig"} > 4)
1212     {
1213       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1214     }
1215     if (!exists $proc{$_}->{"sent_$sig"})
1216     {
1217       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1218       kill $sig, $_;
1219       select (undef, undef, undef, 0.1);
1220       if ($sig == 2)
1221       {
1222         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1223       }
1224       $proc{$_}->{"sent_$sig"} = time;
1225       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1226     }
1227   }
1228 }
1229
1230
1231 sub fhbits
1232 {
1233   my($bits);
1234   for (@_) {
1235     vec($bits,fileno($_),1) = 1;
1236   }
1237   $bits;
1238 }
1239
1240
1241 sub Log                         # ($jobstep_id, $logmessage)
1242 {
1243   if ($_[1] =~ /\n/) {
1244     for my $line (split (/\n/, $_[1])) {
1245       Log ($_[0], $line);
1246     }
1247     return;
1248   }
1249   my $fh = select STDERR; $|=1; select $fh;
1250   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1251   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1252   $message .= "\n";
1253   my $datetime;
1254   if ($metastream || -t STDERR) {
1255     my @gmtime = gmtime;
1256     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1257                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1258   }
1259   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1260
1261   if ($metastream) {
1262     print $metastream $datetime . " " . $message;
1263   }
1264 }
1265
1266
1267 sub croak
1268 {
1269   my ($package, $file, $line) = caller;
1270   my $message = "@_ at $file line $line\n";
1271   Log (undef, $message);
1272   freeze() if @jobstep_todo;
1273   collate_output() if @jobstep_todo;
1274   cleanup();
1275   save_meta() if $metastream;
1276   die;
1277 }
1278
1279
1280 sub cleanup
1281 {
1282   return if !$job_has_uuid;
1283   $Job->update_attributes('running' => 0,
1284                           'success' => 0,
1285                           'finished_at' => scalar gmtime);
1286 }
1287
1288
1289 sub save_meta
1290 {
1291   my $justcheckpoint = shift; # false if this will be the last meta saved
1292   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1293
1294   $local_logfile->flush;
1295   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1296       . quotemeta($local_logfile->filename);
1297   my $loglocator = `$cmd`;
1298   die "system $cmd failed: $?" if $?;
1299
1300   $local_logfile = undef;   # the temp file is automatically deleted
1301   Log (undef, "log manifest is $loglocator");
1302   $Job->{'log'} = $loglocator;
1303   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1304 }
1305
1306
1307 sub freeze_if_want_freeze
1308 {
1309   if ($main::please_freeze)
1310   {
1311     release_allocation();
1312     if (@_)
1313     {
1314       # kill some srun procs before freeze+stop
1315       map { $proc{$_} = {} } @_;
1316       while (%proc)
1317       {
1318         killem (keys %proc);
1319         select (undef, undef, undef, 0.1);
1320         my $died;
1321         while (($died = waitpid (-1, WNOHANG)) > 0)
1322         {
1323           delete $proc{$died};
1324         }
1325       }
1326     }
1327     freeze();
1328     collate_output();
1329     cleanup();
1330     save_meta();
1331     exit 0;
1332   }
1333 }
1334
1335
1336 sub freeze
1337 {
1338   Log (undef, "Freeze not implemented");
1339   return;
1340 }
1341
1342
1343 sub thaw
1344 {
1345   croak ("Thaw not implemented");
1346 }
1347
1348
1349 sub freezequote
1350 {
1351   my $s = shift;
1352   $s =~ s/\\/\\\\/g;
1353   $s =~ s/\n/\\n/g;
1354   return $s;
1355 }
1356
1357
1358 sub freezeunquote
1359 {
1360   my $s = shift;
1361   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1362   return $s;
1363 }
1364
1365
1366 sub srun
1367 {
1368   my $srunargs = shift;
1369   my $execargs = shift;
1370   my $opts = shift || {};
1371   my $stdin = shift;
1372   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1373   print STDERR (join (" ",
1374                       map { / / ? "'$_'" : $_ }
1375                       (@$args)),
1376                 "\n")
1377       if $ENV{CRUNCH_DEBUG};
1378
1379   if (defined $stdin) {
1380     my $child = open STDIN, "-|";
1381     defined $child or die "no fork: $!";
1382     if ($child == 0) {
1383       print $stdin or die $!;
1384       close STDOUT or die $!;
1385       exit 0;
1386     }
1387   }
1388
1389   return system (@$args) if $opts->{fork};
1390
1391   exec @$args;
1392   warn "ENV size is ".length(join(" ",%ENV));
1393   die "exec failed: $!: @$args";
1394 }
1395
1396
1397 sub ban_node_by_slot {
1398   # Don't start any new jobsteps on this node for 60 seconds
1399   my $slotid = shift;
1400   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1401   $slot[$slotid]->{node}->{hold_count}++;
1402   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1403 }
1404
1405 sub must_lock_now
1406 {
1407   my ($lockfile, $error_message) = @_;
1408   open L, ">", $lockfile or croak("$lockfile: $!");
1409   if (!flock L, LOCK_EX|LOCK_NB) {
1410     croak("Can't lock $lockfile: $error_message\n");
1411   }
1412 }
1413
1414 __DATA__
1415 #!/usr/bin/perl
1416
1417 # checkout-and-build
1418
1419 use Fcntl ':flock';
1420
1421 my $destdir = $ENV{"CRUNCH_SRC"};
1422 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1423 my $repo = $ENV{"CRUNCH_SRC_URL"};
1424
1425 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1426 flock L, LOCK_EX;
1427 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1428     exit 0;
1429 }
1430
1431 unlink "$destdir.commit";
1432 open STDOUT, ">", "$destdir.log";
1433 open STDERR, ">&STDOUT";
1434
1435 mkdir $destdir;
1436 my @git_archive_data = <DATA>;
1437 if (@git_archive_data) {
1438   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1439   print TARX @git_archive_data;
1440   if(!close(TARX)) {
1441     die "'tar -C $destdir -xf -' exited $?: $!";
1442   }
1443 }
1444
1445 my $pwd;
1446 chomp ($pwd = `pwd`);
1447 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1448 mkdir $install_dir;
1449
1450 for my $src_path ("$destdir/arvados/sdk/python") {
1451   if (-d $src_path) {
1452     shell_or_die ("virtualenv", $install_dir);
1453     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1454   }
1455 }
1456
1457 if (-e "$destdir/crunch_scripts/install") {
1458     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1459 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1460     # Old version
1461     shell_or_die ("./tests/autotests.sh", $install_dir);
1462 } elsif (-e "./install.sh") {
1463     shell_or_die ("./install.sh", $install_dir);
1464 }
1465
1466 if ($commit) {
1467     unlink "$destdir.commit.new";
1468     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1469     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1470 }
1471
1472 close L;
1473
1474 exit 0;
1475
1476 sub shell_or_die
1477 {
1478   if ($ENV{"DEBUG"}) {
1479     print STDERR "@_\n";
1480   }
1481   system (@_) == 0
1482       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1483 }
1484
1485 __DATA__