2882: Added crunchstat service to collect cgroup stats, and added support to
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $arv_cli;
99
100 if (defined $ENV{"ARV_CLI"}) {
101   $arv_cli = $ENV{"ARV_CLI"};
102 }
103 else {
104   $arv_cli = 'arv';
105 }
106
107 my $force_unlock;
108 my $git_dir;
109 my $jobspec;
110 my $job_api_token;
111 my $no_clear_tmp;
112 my $resume_stash;
113 GetOptions('force-unlock' => \$force_unlock,
114            'git-dir=s' => \$git_dir,
115            'job=s' => \$jobspec,
116            'job-api-token=s' => \$job_api_token,
117            'no-clear-tmp' => \$no_clear_tmp,
118            'resume-stash=s' => \$resume_stash,
119     );
120
121 if (defined $job_api_token) {
122   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
123 }
124
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
128
129
130 $SIG{'USR1'} = sub
131 {
132   $main::ENV{CRUNCH_DEBUG} = 1;
133 };
134 $SIG{'USR2'} = sub
135 {
136   $main::ENV{CRUNCH_DEBUG} = 0;
137 };
138
139
140
141 my $arv = Arvados->new('apiVersion' => 'v1');
142 my $local_logfile;
143
144 my $User = $arv->{'users'}->{'current'}->execute;
145
146 my $Job = {};
147 my $job_id;
148 my $dbh;
149 my $sth;
150 if ($job_has_uuid)
151 {
152   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153   if (!$force_unlock) {
154     if ($Job->{'is_locked_by_uuid'}) {
155       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
156     }
157     if ($Job->{'success'} ne undef) {
158       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
159     }
160     if ($Job->{'running'}) {
161       croak("Job 'running' flag is already set");
162     }
163     if ($Job->{'started_at'}) {
164       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
165     }
166   }
167 }
168 else
169 {
170   $Job = JSON::decode_json($jobspec);
171
172   if (!$resume_stash)
173   {
174     map { croak ("No $_ specified") unless $Job->{$_} }
175     qw(script script_version script_parameters);
176   }
177
178   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179   $Job->{'started_at'} = gmtime;
180
181   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
182
183   $job_has_uuid = 1;
184 }
185 $job_id = $Job->{'uuid'};
186
187 my $keep_logfile = $job_id . '.log.txt';
188 $local_logfile = File::Temp->new();
189
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
193
194
195 Log (undef, "check slurm allocation");
196 my @slot;
197 my @node;
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
199 my @sinfo;
200 if (!$have_slurm)
201 {
202   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203   push @sinfo, "$localcpus localhost";
204 }
205 if (exists $ENV{SLURM_NODELIST})
206 {
207   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
208 }
209 foreach (@sinfo)
210 {
211   my ($ncpus, $slurm_nodelist) = split;
212   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
213
214   my @nodelist;
215   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
216   {
217     my $nodelist = $1;
218     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
219     {
220       my $ranges = $1;
221       foreach (split (",", $ranges))
222       {
223         my ($a, $b);
224         if (/(\d+)-(\d+)/)
225         {
226           $a = $1;
227           $b = $2;
228         }
229         else
230         {
231           $a = $_;
232           $b = $_;
233         }
234         push @nodelist, map {
235           my $n = $nodelist;
236           $n =~ s/\[[-,\d]+\]/$_/;
237           $n;
238         } ($a..$b);
239       }
240     }
241     else
242     {
243       push @nodelist, $nodelist;
244     }
245   }
246   foreach my $nodename (@nodelist)
247   {
248     Log (undef, "node $nodename - $ncpus slots");
249     my $node = { name => $nodename,
250                  ncpus => $ncpus,
251                  losing_streak => 0,
252                  hold_until => 0 };
253     foreach my $cpu (1..$ncpus)
254     {
255       push @slot, { node => $node,
256                     cpu => $cpu };
257     }
258   }
259   push @node, @nodelist;
260 }
261
262
263
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
266
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268
269
270
271 my $jobmanager_id;
272 if ($job_has_uuid)
273 {
274   # Claim this job, and make sure nobody else does
275   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277     croak("Error while updating / locking job");
278   }
279   $Job->update_attributes('started_at' => scalar gmtime,
280                           'running' => 1,
281                           'success' => undef,
282                           'tasks_summary' => { 'failed' => 0,
283                                                'todo' => 1,
284                                                'running' => 0,
285                                                'done' => 0 });
286 }
287
288
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
297
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
303
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
307
308
309 my @jobstep;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
314 my $squeue_checked;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
318
319
320
321 if (defined $Job->{thawedfromkey})
322 {
323   thaw ($Job->{thawedfromkey});
324 }
325 else
326 {
327   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328     'job_uuid' => $Job->{'uuid'},
329     'sequence' => 0,
330     'qsequence' => 0,
331     'parameters' => {},
332                                                           });
333   push @jobstep, { 'level' => 0,
334                    'failures' => 0,
335                    'arvados_task' => $first_task,
336                  };
337   push @jobstep_todo, 0;
338 }
339
340
341 if (!$have_slurm)
342 {
343   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
344 }
345
346
347 my $build_script;
348
349
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
351
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
353 if ($skip_install)
354 {
355   if (!defined $no_clear_tmp) {
356     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357     system($clear_tmp_cmd) == 0
358         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
359   }
360   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
362     if (-d $src_path) {
363       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
366           == 0
367           or croak ("setup.py in $src_path failed: exit ".($?>>8));
368     }
369   }
370 }
371 else
372 {
373   do {
374     local $/ = undef;
375     $build_script = <DATA>;
376   };
377   Log (undef, "Install revision ".$Job->{script_version});
378   my $nodelist = join(",", @node);
379
380   if (!defined $no_clear_tmp) {
381     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382
383     my $cleanpid = fork();
384     if ($cleanpid == 0)
385     {
386       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
388       exit (1);
389     }
390     while (1)
391     {
392       last if $cleanpid == waitpid (-1, WNOHANG);
393       freeze_if_want_freeze ($cleanpid);
394       select (undef, undef, undef, 0.1);
395     }
396     Log (undef, "Clean-work-dir exited $?");
397   }
398
399   # Install requested code version
400
401   my @execargs;
402   my @srunargs = ("srun",
403                   "--nodelist=$nodelist",
404                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
405
406   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
408
409   my $commit;
410   my $git_archive;
411   my $treeish = $Job->{'script_version'};
412
413   # If we're running under crunch-dispatch, it will have pulled the
414   # appropriate source tree into its own repository, and given us that
415   # repo's path as $git_dir. If we're running a "local" job, and a
416   # script_version was specified, it's up to the user to provide the
417   # full path to a local repository in Job->{repository}.
418   #
419   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420   # git-archive --remote where appropriate.
421   #
422   # TODO: Accept a locally-hosted Arvados repository by name or
423   # UUID. Use arvados.v1.repositories.list or .get to figure out the
424   # appropriate fetch-url.
425   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
426
427   $ENV{"CRUNCH_SRC_URL"} = $repo;
428
429   if (-d "$repo/.git") {
430     # We were given a working directory, but we are only interested in
431     # the index.
432     $repo = "$repo/.git";
433   }
434
435   # If this looks like a subversion r#, look for it in git-svn commit messages
436
437   if ($treeish =~ m{^\d{1,4}$}) {
438     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
439     chomp $gitlog;
440     if ($gitlog =~ /^[a-f0-9]{40}$/) {
441       $commit = $gitlog;
442       Log (undef, "Using commit $commit for script_version $treeish");
443     }
444   }
445
446   # If that didn't work, try asking git to look it up as a tree-ish.
447
448   if (!defined $commit) {
449     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
450     chomp $found;
451     if ($found =~ /^[0-9a-f]{40}$/s) {
452       $commit = $found;
453       if ($commit ne $treeish) {
454         # Make sure we record the real commit id in the database,
455         # frozentokey, logs, etc. -- instead of an abbreviation or a
456         # branch name which can become ambiguous or point to a
457         # different commit in the future.
458         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459         Log (undef, "Using commit $commit for tree-ish $treeish");
460         if ($commit ne $treeish) {
461           $Job->{'script_version'} = $commit;
462           !$job_has_uuid or
463               $Job->update_attributes('script_version' => $commit) or
464               croak("Error while updating job");
465         }
466       }
467     }
468   }
469
470   if (defined $commit) {
471     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472     @execargs = ("sh", "-c",
473                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
475   }
476   else {
477     croak ("could not figure out commit id for $treeish");
478   }
479
480   my $installpid = fork();
481   if ($installpid == 0)
482   {
483     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
484     exit (1);
485   }
486   while (1)
487   {
488     last if $installpid == waitpid (-1, WNOHANG);
489     freeze_if_want_freeze ($installpid);
490     select (undef, undef, undef, 0.1);
491   }
492   Log (undef, "Install exited $?");
493 }
494
495 if (!$have_slurm)
496 {
497   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
499 }
500
501 # If this job requires a Docker image, install that.
502 my $docker_bin = "/usr/bin/docker.io";
503 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
504 if ($docker_image) {
505   my $docker_pid = fork();
506   if ($docker_pid == 0)
507   {
508     srun (["srun", "--nodelist=" . join(' ', @node)],
509           [$docker_bin, 'pull', $docker_image]);
510     exit ($?);
511   }
512   while (1)
513   {
514     last if $docker_pid == waitpid (-1, WNOHANG);
515     freeze_if_want_freeze ($docker_pid);
516     select (undef, undef, undef, 0.1);
517   }
518   # If the Docker image was specified as a hash, pull will fail.
519   # Ignore that error.  We'll see what happens when we try to run later.
520   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
521   {
522     croak("Installing Docker image $docker_image returned exit code $?");
523   }
524 }
525
526 foreach (qw (script script_version script_parameters runtime_constraints))
527 {
528   Log (undef,
529        "$_ " .
530        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
531 }
532 foreach (split (/\n/, $Job->{knobs}))
533 {
534   Log (undef, "knob " . $_);
535 }
536
537
538
539 $main::success = undef;
540
541
542
543 ONELEVEL:
544
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
548
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550                        or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
553
554
555
556 my %proc;
557 my @freeslot = (0..$#slot);
558 my @holdslot;
559 my %reader;
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
562
563 update_progress_stats();
564
565
566
567 THISROUND:
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
569 {
570   my $id = $jobstep_todo[$todo_ptr];
571   my $Jobstep = $jobstep[$id];
572   if ($Jobstep->{level} != $level)
573   {
574     next;
575   }
576
577   pipe $reader{$id}, "writer" or croak ($!);
578   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
580
581   my $childslot = $freeslot[0];
582   my $childnode = $slot[$childslot]->{node};
583   my $childslotname = join (".",
584                             $slot[$childslot]->{node}->{name},
585                             $slot[$childslot]->{cpu});
586   my $childpid = fork();
587   if ($childpid == 0)
588   {
589     $SIG{'INT'} = 'DEFAULT';
590     $SIG{'QUIT'} = 'DEFAULT';
591     $SIG{'TERM'} = 'DEFAULT';
592
593     foreach (values (%reader))
594     {
595       close($_);
596     }
597     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598     open(STDOUT,">&writer");
599     open(STDERR,">&writer");
600
601     undef $dbh;
602     undef $sth;
603
604     delete $ENV{"GNUPGHOME"};
605     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606     $ENV{"TASK_QSEQUENCE"} = $id;
607     $ENV{"TASK_SEQUENCE"} = $level;
608     $ENV{"JOB_SCRIPT"} = $Job->{script};
609     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610       $param =~ tr/a-z/A-Z/;
611       $ENV{"JOB_PARAMETER_$param"} = $value;
612     }
613     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
617     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
618     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
619     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
620
621     $ENV{"GZIP"} = "-n";
622
623     my @srunargs = (
624       "srun",
625       "--nodelist=".$childnode->{name},
626       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
627       "--job-name=$job_id.$id.$$",
628         );
629     my $build_script_to_send = "";
630     my $command =
631         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
632         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
633         ."&& cd $ENV{CRUNCH_TMP} ";
634     if ($build_script)
635     {
636       $build_script_to_send = $build_script;
637       $command .=
638           "&& perl -";
639     }
640     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
641     if ($docker_image)
642     {
643       $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid ";
644       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
645       # Dynamically configure the container to use the host system as its
646       # DNS server.  Get the host's global addresses from the ip command,
647       # and turn them into docker --dns options using gawk.
648       $command .=
649           q{$(ip -o address show scope global |
650               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
651       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
652       {
653         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
654       }
655       while (my ($env_key, $env_val) = each %ENV)
656       {
657         if ($env_key =~ /^(JOB|TASK)_/) {
658           $command .= "-e \Q$env_key=$env_val\E ";
659         }
660       }
661       $command .= "\Q$docker_image\E ";
662     } else {
663       $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
664     }
665     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
666     my @execargs = ('bash', '-c', $command);
667     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
668     exit (111);
669   }
670   close("writer");
671   if (!defined $childpid)
672   {
673     close $reader{$id};
674     delete $reader{$id};
675     next;
676   }
677   shift @freeslot;
678   $proc{$childpid} = { jobstep => $id,
679                        time => time,
680                        slot => $childslot,
681                        jobstepname => "$job_id.$id.$childpid",
682                      };
683   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
684   $slot[$childslot]->{pid} = $childpid;
685
686   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
687   Log ($id, "child $childpid started on $childslotname");
688   $Jobstep->{starttime} = time;
689   $Jobstep->{node} = $childnode->{name};
690   $Jobstep->{slotindex} = $childslot;
691   delete $Jobstep->{stderr};
692   delete $Jobstep->{finishtime};
693
694   splice @jobstep_todo, $todo_ptr, 1;
695   --$todo_ptr;
696
697   $progress_is_dirty = 1;
698
699   while (!@freeslot
700          ||
701          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
702   {
703     last THISROUND if $main::please_freeze;
704     if ($main::please_info)
705     {
706       $main::please_info = 0;
707       freeze();
708       collate_output();
709       save_meta(1);
710       update_progress_stats();
711     }
712     my $gotsome
713         = readfrompipes ()
714         + reapchildren ();
715     if (!$gotsome)
716     {
717       check_refresh_wanted();
718       check_squeue();
719       update_progress_stats();
720       select (undef, undef, undef, 0.1);
721     }
722     elsif (time - $progress_stats_updated >= 30)
723     {
724       update_progress_stats();
725     }
726     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
727         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
728     {
729       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
730           .($thisround_failed+$thisround_succeeded)
731           .") -- giving up on this round";
732       Log (undef, $message);
733       last THISROUND;
734     }
735
736     # move slots from freeslot to holdslot (or back to freeslot) if necessary
737     for (my $i=$#freeslot; $i>=0; $i--) {
738       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
739         push @holdslot, (splice @freeslot, $i, 1);
740       }
741     }
742     for (my $i=$#holdslot; $i>=0; $i--) {
743       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
744         push @freeslot, (splice @holdslot, $i, 1);
745       }
746     }
747
748     # give up if no nodes are succeeding
749     if (!grep { $_->{node}->{losing_streak} == 0 &&
750                     $_->{node}->{hold_count} < 4 } @slot) {
751       my $message = "Every node has failed -- giving up on this round";
752       Log (undef, $message);
753       last THISROUND;
754     }
755   }
756 }
757
758
759 push @freeslot, splice @holdslot;
760 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
761
762
763 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
764 while (%proc)
765 {
766   if ($main::please_continue) {
767     $main::please_continue = 0;
768     goto THISROUND;
769   }
770   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
771   readfrompipes ();
772   if (!reapchildren())
773   {
774     check_refresh_wanted();
775     check_squeue();
776     update_progress_stats();
777     select (undef, undef, undef, 0.1);
778     killem (keys %proc) if $main::please_freeze;
779   }
780 }
781
782 update_progress_stats();
783 freeze_if_want_freeze();
784
785
786 if (!defined $main::success)
787 {
788   if (@jobstep_todo &&
789       $thisround_succeeded == 0 &&
790       ($thisround_failed == 0 || $thisround_failed > 4))
791   {
792     my $message = "stop because $thisround_failed tasks failed and none succeeded";
793     Log (undef, $message);
794     $main::success = 0;
795   }
796   if (!@jobstep_todo)
797   {
798     $main::success = 1;
799   }
800 }
801
802 goto ONELEVEL if !defined $main::success;
803
804
805 release_allocation();
806 freeze();
807 if ($job_has_uuid) {
808   $Job->update_attributes('output' => &collate_output(),
809                           'running' => 0,
810                           'success' => $Job->{'output'} && $main::success,
811                           'finished_at' => scalar gmtime)
812 }
813
814 if ($Job->{'output'})
815 {
816   eval {
817     my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
818     $arv->{'collections'}->{'create'}->execute('collection' => {
819       'uuid' => $Job->{'output'},
820       'manifest_text' => $manifest_text,
821     });
822     if ($Job->{'output_is_persistent'}) {
823       $arv->{'links'}->{'create'}->execute('link' => {
824         'tail_kind' => 'arvados#user',
825         'tail_uuid' => $User->{'uuid'},
826         'head_kind' => 'arvados#collection',
827         'head_uuid' => $Job->{'output'},
828         'link_class' => 'resources',
829         'name' => 'wants',
830       });
831     }
832   };
833   if ($@) {
834     Log (undef, "Failed to register output manifest: $@");
835   }
836 }
837
838 Log (undef, "finish");
839
840 save_meta();
841 exit 0;
842
843
844
845 sub update_progress_stats
846 {
847   $progress_stats_updated = time;
848   return if !$progress_is_dirty;
849   my ($todo, $done, $running) = (scalar @jobstep_todo,
850                                  scalar @jobstep_done,
851                                  scalar @slot - scalar @freeslot - scalar @holdslot);
852   $Job->{'tasks_summary'} ||= {};
853   $Job->{'tasks_summary'}->{'todo'} = $todo;
854   $Job->{'tasks_summary'}->{'done'} = $done;
855   $Job->{'tasks_summary'}->{'running'} = $running;
856   if ($job_has_uuid) {
857     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
858   }
859   Log (undef, "status: $done done, $running running, $todo todo");
860   $progress_is_dirty = 0;
861 }
862
863
864
865 sub reapchildren
866 {
867   my $pid = waitpid (-1, WNOHANG);
868   return 0 if $pid <= 0;
869
870   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
871                   . "."
872                   . $slot[$proc{$pid}->{slot}]->{cpu});
873   my $jobstepid = $proc{$pid}->{jobstep};
874   my $elapsed = time - $proc{$pid}->{time};
875   my $Jobstep = $jobstep[$jobstepid];
876
877   my $childstatus = $?;
878   my $exitvalue = $childstatus >> 8;
879   my $exitinfo = sprintf("exit %d signal %d%s",
880                          $exitvalue,
881                          $childstatus & 127,
882                          ($childstatus & 128 ? ' core dump' : ''));
883   $Jobstep->{'arvados_task'}->reload;
884   my $task_success = $Jobstep->{'arvados_task'}->{success};
885
886   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
887
888   if (!defined $task_success) {
889     # task did not indicate one way or the other --> fail
890     $Jobstep->{'arvados_task'}->{success} = 0;
891     $Jobstep->{'arvados_task'}->save;
892     $task_success = 0;
893   }
894
895   if (!$task_success)
896   {
897     my $temporary_fail;
898     $temporary_fail ||= $Jobstep->{node_fail};
899     $temporary_fail ||= ($exitvalue == 111);
900
901     ++$thisround_failed;
902     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
903
904     # Check for signs of a failed or misconfigured node
905     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
906         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
907       # Don't count this against jobstep failure thresholds if this
908       # node is already suspected faulty and srun exited quickly
909       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
910           $elapsed < 5) {
911         Log ($jobstepid, "blaming failure on suspect node " .
912              $slot[$proc{$pid}->{slot}]->{node}->{name});
913         $temporary_fail ||= 1;
914       }
915       ban_node_by_slot($proc{$pid}->{slot});
916     }
917
918     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
919                              ++$Jobstep->{'failures'},
920                              $temporary_fail ? 'temporary ' : 'permanent',
921                              $elapsed));
922
923     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
924       # Give up on this task, and the whole job
925       $main::success = 0;
926       $main::please_freeze = 1;
927     }
928     else {
929       # Put this task back on the todo queue
930       push @jobstep_todo, $jobstepid;
931     }
932     $Job->{'tasks_summary'}->{'failed'}++;
933   }
934   else
935   {
936     ++$thisround_succeeded;
937     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
938     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
939     push @jobstep_done, $jobstepid;
940     Log ($jobstepid, "success in $elapsed seconds");
941   }
942   $Jobstep->{exitcode} = $childstatus;
943   $Jobstep->{finishtime} = time;
944   process_stderr ($jobstepid, $task_success);
945   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
946
947   close $reader{$jobstepid};
948   delete $reader{$jobstepid};
949   delete $slot[$proc{$pid}->{slot}]->{pid};
950   push @freeslot, $proc{$pid}->{slot};
951   delete $proc{$pid};
952
953   # Load new tasks
954   my $newtask_list = [];
955   my $newtask_results;
956   do {
957     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
958       'where' => {
959         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
960       },
961       'order' => 'qsequence',
962       'offset' => scalar(@$newtask_list),
963     );
964     push(@$newtask_list, @{$newtask_results->{items}});
965   } while (@{$newtask_results->{items}});
966   foreach my $arvados_task (@$newtask_list) {
967     my $jobstep = {
968       'level' => $arvados_task->{'sequence'},
969       'failures' => 0,
970       'arvados_task' => $arvados_task
971     };
972     push @jobstep, $jobstep;
973     push @jobstep_todo, $#jobstep;
974   }
975
976   $progress_is_dirty = 1;
977   1;
978 }
979
980 sub check_refresh_wanted
981 {
982   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
983   if (@stat && $stat[9] > $latest_refresh) {
984     $latest_refresh = scalar time;
985     if ($job_has_uuid) {
986       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
987       for my $attr ('cancelled_at',
988                     'cancelled_by_user_uuid',
989                     'cancelled_by_client_uuid') {
990         $Job->{$attr} = $Job2->{$attr};
991       }
992       if ($Job->{'cancelled_at'}) {
993         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
994              " by user " . $Job->{cancelled_by_user_uuid});
995         $main::success = 0;
996         $main::please_freeze = 1;
997       }
998     }
999   }
1000 }
1001
1002 sub check_squeue
1003 {
1004   # return if the kill list was checked <4 seconds ago
1005   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1006   {
1007     return;
1008   }
1009   $squeue_kill_checked = time;
1010
1011   # use killem() on procs whose killtime is reached
1012   for (keys %proc)
1013   {
1014     if (exists $proc{$_}->{killtime}
1015         && $proc{$_}->{killtime} <= time)
1016     {
1017       killem ($_);
1018     }
1019   }
1020
1021   # return if the squeue was checked <60 seconds ago
1022   if (defined $squeue_checked && $squeue_checked > time - 60)
1023   {
1024     return;
1025   }
1026   $squeue_checked = time;
1027
1028   if (!$have_slurm)
1029   {
1030     # here is an opportunity to check for mysterious problems with local procs
1031     return;
1032   }
1033
1034   # get a list of steps still running
1035   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1036   chop @squeue;
1037   if ($squeue[-1] ne "ok")
1038   {
1039     return;
1040   }
1041   pop @squeue;
1042
1043   # which of my jobsteps are running, according to squeue?
1044   my %ok;
1045   foreach (@squeue)
1046   {
1047     if (/^(\d+)\.(\d+) (\S+)/)
1048     {
1049       if ($1 eq $ENV{SLURM_JOBID})
1050       {
1051         $ok{$3} = 1;
1052       }
1053     }
1054   }
1055
1056   # which of my active child procs (>60s old) were not mentioned by squeue?
1057   foreach (keys %proc)
1058   {
1059     if ($proc{$_}->{time} < time - 60
1060         && !exists $ok{$proc{$_}->{jobstepname}}
1061         && !exists $proc{$_}->{killtime})
1062     {
1063       # kill this proc if it hasn't exited in 30 seconds
1064       $proc{$_}->{killtime} = time + 30;
1065     }
1066   }
1067 }
1068
1069
1070 sub release_allocation
1071 {
1072   if ($have_slurm)
1073   {
1074     Log (undef, "release job allocation");
1075     system "scancel $ENV{SLURM_JOBID}";
1076   }
1077 }
1078
1079
1080 sub readfrompipes
1081 {
1082   my $gotsome = 0;
1083   foreach my $job (keys %reader)
1084   {
1085     my $buf;
1086     while (0 < sysread ($reader{$job}, $buf, 8192))
1087     {
1088       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1089       $jobstep[$job]->{stderr} .= $buf;
1090       preprocess_stderr ($job);
1091       if (length ($jobstep[$job]->{stderr}) > 16384)
1092       {
1093         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1094       }
1095       $gotsome = 1;
1096     }
1097   }
1098   return $gotsome;
1099 }
1100
1101
1102 sub preprocess_stderr
1103 {
1104   my $job = shift;
1105
1106   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1107     my $line = $1;
1108     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1109     Log ($job, "stderr $line");
1110     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1111       # whoa.
1112       $main::please_freeze = 1;
1113     }
1114     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1115       $jobstep[$job]->{node_fail} = 1;
1116       ban_node_by_slot($jobstep[$job]->{slotindex});
1117     }
1118   }
1119 }
1120
1121
1122 sub process_stderr
1123 {
1124   my $job = shift;
1125   my $task_success = shift;
1126   preprocess_stderr ($job);
1127
1128   map {
1129     Log ($job, "stderr $_");
1130   } split ("\n", $jobstep[$job]->{stderr});
1131 }
1132
1133 sub fetch_block
1134 {
1135   my $hash = shift;
1136   my ($keep, $child_out, $output_block);
1137
1138   my $cmd = "$arv_cli keep get \Q$hash\E";
1139   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1140   sysread($keep, $output_block, 64 * 1024 * 1024);
1141   close $keep;
1142   return $output_block;
1143 }
1144
1145 sub collate_output
1146 {
1147   Log (undef, "collate");
1148
1149   my ($child_out, $child_in);
1150   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1151   my $joboutput;
1152   for (@jobstep)
1153   {
1154     next if (!exists $_->{'arvados_task'}->{output} ||
1155              !$_->{'arvados_task'}->{'success'} ||
1156              $_->{'exitcode'} != 0);
1157     my $output = $_->{'arvados_task'}->{output};
1158     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1159     {
1160       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1161       print $child_in $output;
1162     }
1163     elsif (@jobstep == 1)
1164     {
1165       $joboutput = $output;
1166       last;
1167     }
1168     elsif (defined (my $outblock = fetch_block ($output)))
1169     {
1170       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1171       print $child_in $outblock;
1172     }
1173     else
1174     {
1175       Log (undef, "XXX fetch_block($output) failed XXX");
1176       $main::success = 0;
1177     }
1178   }
1179   $child_in->close;
1180
1181   if (!defined $joboutput) {
1182     my $s = IO::Select->new($child_out);
1183     if ($s->can_read(120)) {
1184       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1185       chomp($joboutput);
1186     } else {
1187       Log (undef, "timed out reading from 'arv keep put'");
1188     }
1189   }
1190   waitpid($pid, 0);
1191
1192   if ($joboutput)
1193   {
1194     Log (undef, "output $joboutput");
1195     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1196   }
1197   else
1198   {
1199     Log (undef, "output undef");
1200   }
1201   return $joboutput;
1202 }
1203
1204
1205 sub killem
1206 {
1207   foreach (@_)
1208   {
1209     my $sig = 2;                # SIGINT first
1210     if (exists $proc{$_}->{"sent_$sig"} &&
1211         time - $proc{$_}->{"sent_$sig"} > 4)
1212     {
1213       $sig = 15;                # SIGTERM if SIGINT doesn't work
1214     }
1215     if (exists $proc{$_}->{"sent_$sig"} &&
1216         time - $proc{$_}->{"sent_$sig"} > 4)
1217     {
1218       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1219     }
1220     if (!exists $proc{$_}->{"sent_$sig"})
1221     {
1222       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1223       kill $sig, $_;
1224       select (undef, undef, undef, 0.1);
1225       if ($sig == 2)
1226       {
1227         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1228       }
1229       $proc{$_}->{"sent_$sig"} = time;
1230       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1231     }
1232   }
1233 }
1234
1235
1236 sub fhbits
1237 {
1238   my($bits);
1239   for (@_) {
1240     vec($bits,fileno($_),1) = 1;
1241   }
1242   $bits;
1243 }
1244
1245
1246 sub Log                         # ($jobstep_id, $logmessage)
1247 {
1248   if ($_[1] =~ /\n/) {
1249     for my $line (split (/\n/, $_[1])) {
1250       Log ($_[0], $line);
1251     }
1252     return;
1253   }
1254   my $fh = select STDERR; $|=1; select $fh;
1255   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1256   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1257   $message .= "\n";
1258   my $datetime;
1259   if ($local_logfile || -t STDERR) {
1260     my @gmtime = gmtime;
1261     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1262                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1263   }
1264   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1265
1266   if ($local_logfile) {
1267     print $local_logfile $datetime . " " . $message;
1268   }
1269 }
1270
1271
1272 sub croak
1273 {
1274   my ($package, $file, $line) = caller;
1275   my $message = "@_ at $file line $line\n";
1276   Log (undef, $message);
1277   freeze() if @jobstep_todo;
1278   collate_output() if @jobstep_todo;
1279   cleanup();
1280   save_meta() if $local_logfile;
1281   die;
1282 }
1283
1284
1285 sub cleanup
1286 {
1287   return if !$job_has_uuid;
1288   $Job->update_attributes('running' => 0,
1289                           'success' => 0,
1290                           'finished_at' => scalar gmtime);
1291 }
1292
1293
1294 sub save_meta
1295 {
1296   my $justcheckpoint = shift; # false if this will be the last meta saved
1297   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1298
1299   $local_logfile->flush;
1300   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1301       . quotemeta($local_logfile->filename);
1302   my $loglocator = `$cmd`;
1303   die "system $cmd failed: $?" if $?;
1304   chomp($loglocator);
1305
1306   $local_logfile = undef;   # the temp file is automatically deleted
1307   Log (undef, "log manifest is $loglocator");
1308   $Job->{'log'} = $loglocator;
1309   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1310 }
1311
1312
1313 sub freeze_if_want_freeze
1314 {
1315   if ($main::please_freeze)
1316   {
1317     release_allocation();
1318     if (@_)
1319     {
1320       # kill some srun procs before freeze+stop
1321       map { $proc{$_} = {} } @_;
1322       while (%proc)
1323       {
1324         killem (keys %proc);
1325         select (undef, undef, undef, 0.1);
1326         my $died;
1327         while (($died = waitpid (-1, WNOHANG)) > 0)
1328         {
1329           delete $proc{$died};
1330         }
1331       }
1332     }
1333     freeze();
1334     collate_output();
1335     cleanup();
1336     save_meta();
1337     exit 0;
1338   }
1339 }
1340
1341
1342 sub freeze
1343 {
1344   Log (undef, "Freeze not implemented");
1345   return;
1346 }
1347
1348
1349 sub thaw
1350 {
1351   croak ("Thaw not implemented");
1352 }
1353
1354
1355 sub freezequote
1356 {
1357   my $s = shift;
1358   $s =~ s/\\/\\\\/g;
1359   $s =~ s/\n/\\n/g;
1360   return $s;
1361 }
1362
1363
1364 sub freezeunquote
1365 {
1366   my $s = shift;
1367   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1368   return $s;
1369 }
1370
1371
1372 sub srun
1373 {
1374   my $srunargs = shift;
1375   my $execargs = shift;
1376   my $opts = shift || {};
1377   my $stdin = shift;
1378   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1379   print STDERR (join (" ",
1380                       map { / / ? "'$_'" : $_ }
1381                       (@$args)),
1382                 "\n")
1383       if $ENV{CRUNCH_DEBUG};
1384
1385   if (defined $stdin) {
1386     my $child = open STDIN, "-|";
1387     defined $child or die "no fork: $!";
1388     if ($child == 0) {
1389       print $stdin or die $!;
1390       close STDOUT or die $!;
1391       exit 0;
1392     }
1393   }
1394
1395   return system (@$args) if $opts->{fork};
1396
1397   exec @$args;
1398   warn "ENV size is ".length(join(" ",%ENV));
1399   die "exec failed: $!: @$args";
1400 }
1401
1402
1403 sub ban_node_by_slot {
1404   # Don't start any new jobsteps on this node for 60 seconds
1405   my $slotid = shift;
1406   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1407   $slot[$slotid]->{node}->{hold_count}++;
1408   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1409 }
1410
1411 sub must_lock_now
1412 {
1413   my ($lockfile, $error_message) = @_;
1414   open L, ">", $lockfile or croak("$lockfile: $!");
1415   if (!flock L, LOCK_EX|LOCK_NB) {
1416     croak("Can't lock $lockfile: $error_message\n");
1417   }
1418 }
1419
1420 __DATA__
1421 #!/usr/bin/perl
1422
1423 # checkout-and-build
1424
1425 use Fcntl ':flock';
1426
1427 my $destdir = $ENV{"CRUNCH_SRC"};
1428 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1429 my $repo = $ENV{"CRUNCH_SRC_URL"};
1430
1431 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1432 flock L, LOCK_EX;
1433 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1434     exit 0;
1435 }
1436
1437 unlink "$destdir.commit";
1438 open STDOUT, ">", "$destdir.log";
1439 open STDERR, ">&STDOUT";
1440
1441 mkdir $destdir;
1442 my @git_archive_data = <DATA>;
1443 if (@git_archive_data) {
1444   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1445   print TARX @git_archive_data;
1446   if(!close(TARX)) {
1447     die "'tar -C $destdir -xf -' exited $?: $!";
1448   }
1449 }
1450
1451 my $pwd;
1452 chomp ($pwd = `pwd`);
1453 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1454 mkdir $install_dir;
1455
1456 for my $src_path ("$destdir/arvados/sdk/python") {
1457   if (-d $src_path) {
1458     shell_or_die ("virtualenv", $install_dir);
1459     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1460   }
1461 }
1462
1463 if (-e "$destdir/crunch_scripts/install") {
1464     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1465 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1466     # Old version
1467     shell_or_die ("./tests/autotests.sh", $install_dir);
1468 } elsif (-e "./install.sh") {
1469     shell_or_die ("./install.sh", $install_dir);
1470 }
1471
1472 if ($commit) {
1473     unlink "$destdir.commit.new";
1474     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1475     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1476 }
1477
1478 close L;
1479
1480 exit 0;
1481
1482 sub shell_or_die
1483 {
1484   if ($ENV{"DEBUG"}) {
1485     print STDERR "@_\n";
1486   }
1487   system (@_) == 0
1488       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1489 }
1490
1491 __DATA__