Refactored "save temp file rescue behavior into function
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506   $docker_hash = find_docker_hash($docker_locator);
507   if (!$docker_hash)
508   {
509     croak("No Docker image hash found from locator $docker_locator");
510   }
511   my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 fi
515 };
516   my $docker_pid = fork();
517   if ($docker_pid == 0)
518   {
519     srun (["srun", "--nodelist=" . join(',', @node)],
520           ["/bin/sh", "-ec", $docker_install_script]);
521     exit ($?);
522   }
523   while (1)
524   {
525     last if $docker_pid == waitpid (-1, WNOHANG);
526     freeze_if_want_freeze ($docker_pid);
527     select (undef, undef, undef, 0.1);
528   }
529   if ($? != 0)
530   {
531     croak("Installing Docker image from $docker_locator returned exit code $?");
532   }
533 }
534
535 foreach (qw (script script_version script_parameters runtime_constraints))
536 {
537   Log (undef,
538        "$_ " .
539        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 }
541 foreach (split (/\n/, $Job->{knobs}))
542 {
543   Log (undef, "knob " . $_);
544 }
545
546
547
548 $main::success = undef;
549
550
551
552 ONELEVEL:
553
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
557
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559                        or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
562
563
564
565 my %proc;
566 my @freeslot = (0..$#slot);
567 my @holdslot;
568 my %reader;
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
571
572 update_progress_stats();
573
574
575
576 THISROUND:
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 {
579   my $id = $jobstep_todo[$todo_ptr];
580   my $Jobstep = $jobstep[$id];
581   if ($Jobstep->{level} != $level)
582   {
583     next;
584   }
585
586   pipe $reader{$id}, "writer" or croak ($!);
587   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589
590   my $childslot = $freeslot[0];
591   my $childnode = $slot[$childslot]->{node};
592   my $childslotname = join (".",
593                             $slot[$childslot]->{node}->{name},
594                             $slot[$childslot]->{cpu});
595   my $childpid = fork();
596   if ($childpid == 0)
597   {
598     $SIG{'INT'} = 'DEFAULT';
599     $SIG{'QUIT'} = 'DEFAULT';
600     $SIG{'TERM'} = 'DEFAULT';
601
602     foreach (values (%reader))
603     {
604       close($_);
605     }
606     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607     open(STDOUT,">&writer");
608     open(STDERR,">&writer");
609
610     undef $dbh;
611     undef $sth;
612
613     delete $ENV{"GNUPGHOME"};
614     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615     $ENV{"TASK_QSEQUENCE"} = $id;
616     $ENV{"TASK_SEQUENCE"} = $level;
617     $ENV{"JOB_SCRIPT"} = $Job->{script};
618     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619       $param =~ tr/a-z/A-Z/;
620       $ENV{"JOB_PARAMETER_$param"} = $value;
621     }
622     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
626     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
627     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
628     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
629
630     $ENV{"GZIP"} = "-n";
631
632     my @srunargs = (
633       "srun",
634       "--nodelist=".$childnode->{name},
635       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
636       "--job-name=$job_id.$id.$$",
637         );
638     my $build_script_to_send = "";
639     my $command =
640         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
641         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
642         ."&& chmod og+wrx $ENV{TASK_WORK}"
643         ."&& cd $ENV{CRUNCH_TMP} ";
644     umask(077);
645     if ($build_script)
646     {
647       $build_script_to_send = $build_script;
648       $command .=
649           "&& perl -";
650     }
651     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
652     if ($docker_hash)
653     {
654       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
655       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
656       # Dynamically configure the container to use the host system as its
657       # DNS server.  Get the host's global addresses from the ip command,
658       # and turn them into docker --dns options using gawk.
659       $command .=
660           q{$(ip -o address show scope global |
661               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
662       $command .= "-v \Q$ENV{TASK_WORK}:/tmp/crunch-job:rw\E ";
663       $command .= "-v \Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
664       $command .= "-v \Q$ENV{TASK_KEEPMOUNT}:/mnt:ro\E ";
665       while (my ($env_key, $env_val) = each %ENV)
666       {
667         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
668           if ($env_key eq "TASK_WORK") {
669             $command .= "-e \QTASK_WORK=/tmp/crunch-job\E ";
670           }
671           elsif ($env_key eq "TASK_KEEPMOUNT") {
672             $command .= "-e \QTASK_KEEPMOUNT=/mnt\E ";
673           }
674           elsif ($env_key eq "CRUNCH_SRC") {
675             $command .= "-e \QCRUNCH_SRC=/tmp/crunch-src\E ";
676           }
677           else {
678             $command .= "-e \Q$env_key=$env_val\E ";
679           }
680         }
681       }
682       $command .= "\Q$docker_hash\E ";
683     } else {
684       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
685     }
686     $command .= "stdbuf -o0 -e0 ";
687     $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
688     my @execargs = ('bash', '-c', $command);
689     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
690     exit (111);
691   }
692   close("writer");
693   if (!defined $childpid)
694   {
695     close $reader{$id};
696     delete $reader{$id};
697     next;
698   }
699   shift @freeslot;
700   $proc{$childpid} = { jobstep => $id,
701                        time => time,
702                        slot => $childslot,
703                        jobstepname => "$job_id.$id.$childpid",
704                      };
705   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
706   $slot[$childslot]->{pid} = $childpid;
707
708   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
709   Log ($id, "child $childpid started on $childslotname");
710   $Jobstep->{starttime} = time;
711   $Jobstep->{node} = $childnode->{name};
712   $Jobstep->{slotindex} = $childslot;
713   delete $Jobstep->{stderr};
714   delete $Jobstep->{finishtime};
715
716   splice @jobstep_todo, $todo_ptr, 1;
717   --$todo_ptr;
718
719   $progress_is_dirty = 1;
720
721   while (!@freeslot
722          ||
723          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
724   {
725     last THISROUND if $main::please_freeze;
726     if ($main::please_info)
727     {
728       $main::please_info = 0;
729       freeze();
730       collate_output();
731       save_meta(1);
732       update_progress_stats();
733     }
734     my $gotsome
735         = readfrompipes ()
736         + reapchildren ();
737     if (!$gotsome)
738     {
739       check_refresh_wanted();
740       check_squeue();
741       update_progress_stats();
742       select (undef, undef, undef, 0.1);
743     }
744     elsif (time - $progress_stats_updated >= 30)
745     {
746       update_progress_stats();
747     }
748     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
749         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
750     {
751       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
752           .($thisround_failed+$thisround_succeeded)
753           .") -- giving up on this round";
754       Log (undef, $message);
755       last THISROUND;
756     }
757
758     # move slots from freeslot to holdslot (or back to freeslot) if necessary
759     for (my $i=$#freeslot; $i>=0; $i--) {
760       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
761         push @holdslot, (splice @freeslot, $i, 1);
762       }
763     }
764     for (my $i=$#holdslot; $i>=0; $i--) {
765       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
766         push @freeslot, (splice @holdslot, $i, 1);
767       }
768     }
769
770     # give up if no nodes are succeeding
771     if (!grep { $_->{node}->{losing_streak} == 0 &&
772                     $_->{node}->{hold_count} < 4 } @slot) {
773       my $message = "Every node has failed -- giving up on this round";
774       Log (undef, $message);
775       last THISROUND;
776     }
777   }
778 }
779
780
781 push @freeslot, splice @holdslot;
782 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
783
784
785 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
786 while (%proc)
787 {
788   if ($main::please_continue) {
789     $main::please_continue = 0;
790     goto THISROUND;
791   }
792   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
793   readfrompipes ();
794   if (!reapchildren())
795   {
796     check_refresh_wanted();
797     check_squeue();
798     update_progress_stats();
799     select (undef, undef, undef, 0.1);
800     killem (keys %proc) if $main::please_freeze;
801   }
802 }
803
804 update_progress_stats();
805 freeze_if_want_freeze();
806
807
808 if (!defined $main::success)
809 {
810   if (@jobstep_todo &&
811       $thisround_succeeded == 0 &&
812       ($thisround_failed == 0 || $thisround_failed > 4))
813   {
814     my $message = "stop because $thisround_failed tasks failed and none succeeded";
815     Log (undef, $message);
816     $main::success = 0;
817   }
818   if (!@jobstep_todo)
819   {
820     $main::success = 1;
821   }
822 }
823
824 goto ONELEVEL if !defined $main::success;
825
826
827 release_allocation();
828 freeze();
829 my $collated_output = &collate_output();
830
831 if ($job_has_uuid) {
832   $Job->update_attributes('running' => 0,
833                           'success' => $collated_output && $main::success,
834                           'finished_at' => scalar gmtime)
835 }
836
837 if ($collated_output)
838 {
839   eval {
840     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
841         or die "failed to get collated manifest: $!";
842     # Read the original manifest, and strip permission hints from it,
843     # so we can put the result in a Collection.
844     my @stripped_manifest_lines = ();
845     my $orig_manifest_text = '';
846     while (my $manifest_line = <$orig_manifest>) {
847       $orig_manifest_text .= $manifest_line;
848       my @words = split(/ /, $manifest_line, -1);
849       foreach my $ii (0..$#words) {
850         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
851           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
852         }
853       }
854       push(@stripped_manifest_lines, join(" ", @words));
855     }
856     my $stripped_manifest_text = join("", @stripped_manifest_lines);
857     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
858       'uuid' => md5_hex($stripped_manifest_text),
859       'manifest_text' => $orig_manifest_text,
860     });
861     $Job->update_attributes('output' => $output->{uuid});
862     if ($Job->{'output_is_persistent'}) {
863       $arv->{'links'}->{'create'}->execute('link' => {
864         'tail_kind' => 'arvados#user',
865         'tail_uuid' => $User->{'uuid'},
866         'head_kind' => 'arvados#collection',
867         'head_uuid' => $Job->{'output'},
868         'link_class' => 'resources',
869         'name' => 'wants',
870       });
871     }
872   };
873   if ($@) {
874     Log (undef, "Failed to register output manifest: $@");
875   }
876 }
877
878 Log (undef, "finish");
879
880 save_meta();
881 exit 0;
882
883
884
885 sub update_progress_stats
886 {
887   $progress_stats_updated = time;
888   return if !$progress_is_dirty;
889   my ($todo, $done, $running) = (scalar @jobstep_todo,
890                                  scalar @jobstep_done,
891                                  scalar @slot - scalar @freeslot - scalar @holdslot);
892   $Job->{'tasks_summary'} ||= {};
893   $Job->{'tasks_summary'}->{'todo'} = $todo;
894   $Job->{'tasks_summary'}->{'done'} = $done;
895   $Job->{'tasks_summary'}->{'running'} = $running;
896   if ($job_has_uuid) {
897     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
898   }
899   Log (undef, "status: $done done, $running running, $todo todo");
900   $progress_is_dirty = 0;
901 }
902
903
904
905 sub reapchildren
906 {
907   my $pid = waitpid (-1, WNOHANG);
908   return 0 if $pid <= 0;
909
910   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
911                   . "."
912                   . $slot[$proc{$pid}->{slot}]->{cpu});
913   my $jobstepid = $proc{$pid}->{jobstep};
914   my $elapsed = time - $proc{$pid}->{time};
915   my $Jobstep = $jobstep[$jobstepid];
916
917   my $childstatus = $?;
918   my $exitvalue = $childstatus >> 8;
919   my $exitinfo = sprintf("exit %d signal %d%s",
920                          $exitvalue,
921                          $childstatus & 127,
922                          ($childstatus & 128 ? ' core dump' : ''));
923   $Jobstep->{'arvados_task'}->reload;
924   my $task_success = $Jobstep->{'arvados_task'}->{success};
925
926   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
927
928   if (!defined $task_success) {
929     # task did not indicate one way or the other --> fail
930     $Jobstep->{'arvados_task'}->{success} = 0;
931     $Jobstep->{'arvados_task'}->save;
932     $task_success = 0;
933   }
934
935   if (!$task_success)
936   {
937     my $temporary_fail;
938     $temporary_fail ||= $Jobstep->{node_fail};
939     $temporary_fail ||= ($exitvalue == 111);
940
941     ++$thisround_failed;
942     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
943
944     # Check for signs of a failed or misconfigured node
945     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
946         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
947       # Don't count this against jobstep failure thresholds if this
948       # node is already suspected faulty and srun exited quickly
949       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
950           $elapsed < 5) {
951         Log ($jobstepid, "blaming failure on suspect node " .
952              $slot[$proc{$pid}->{slot}]->{node}->{name});
953         $temporary_fail ||= 1;
954       }
955       ban_node_by_slot($proc{$pid}->{slot});
956     }
957
958     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
959                              ++$Jobstep->{'failures'},
960                              $temporary_fail ? 'temporary ' : 'permanent',
961                              $elapsed));
962
963     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
964       # Give up on this task, and the whole job
965       $main::success = 0;
966       $main::please_freeze = 1;
967     }
968     else {
969       # Put this task back on the todo queue
970       push @jobstep_todo, $jobstepid;
971     }
972     $Job->{'tasks_summary'}->{'failed'}++;
973   }
974   else
975   {
976     ++$thisround_succeeded;
977     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
978     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
979     push @jobstep_done, $jobstepid;
980     Log ($jobstepid, "success in $elapsed seconds");
981   }
982   $Jobstep->{exitcode} = $childstatus;
983   $Jobstep->{finishtime} = time;
984   process_stderr ($jobstepid, $task_success);
985   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
986
987   close $reader{$jobstepid};
988   delete $reader{$jobstepid};
989   delete $slot[$proc{$pid}->{slot}]->{pid};
990   push @freeslot, $proc{$pid}->{slot};
991   delete $proc{$pid};
992
993   # Load new tasks
994   my $newtask_list = [];
995   my $newtask_results;
996   do {
997     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
998       'where' => {
999         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1000       },
1001       'order' => 'qsequence',
1002       'offset' => scalar(@$newtask_list),
1003     );
1004     push(@$newtask_list, @{$newtask_results->{items}});
1005   } while (@{$newtask_results->{items}});
1006   foreach my $arvados_task (@$newtask_list) {
1007     my $jobstep = {
1008       'level' => $arvados_task->{'sequence'},
1009       'failures' => 0,
1010       'arvados_task' => $arvados_task
1011     };
1012     push @jobstep, $jobstep;
1013     push @jobstep_todo, $#jobstep;
1014   }
1015
1016   $progress_is_dirty = 1;
1017   1;
1018 }
1019
1020 sub check_refresh_wanted
1021 {
1022   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1023   if (@stat && $stat[9] > $latest_refresh) {
1024     $latest_refresh = scalar time;
1025     if ($job_has_uuid) {
1026       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1027       for my $attr ('cancelled_at',
1028                     'cancelled_by_user_uuid',
1029                     'cancelled_by_client_uuid') {
1030         $Job->{$attr} = $Job2->{$attr};
1031       }
1032       if ($Job->{'cancelled_at'}) {
1033         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1034              " by user " . $Job->{cancelled_by_user_uuid});
1035         $main::success = 0;
1036         $main::please_freeze = 1;
1037       }
1038     }
1039   }
1040 }
1041
1042 sub check_squeue
1043 {
1044   # return if the kill list was checked <4 seconds ago
1045   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1046   {
1047     return;
1048   }
1049   $squeue_kill_checked = time;
1050
1051   # use killem() on procs whose killtime is reached
1052   for (keys %proc)
1053   {
1054     if (exists $proc{$_}->{killtime}
1055         && $proc{$_}->{killtime} <= time)
1056     {
1057       killem ($_);
1058     }
1059   }
1060
1061   # return if the squeue was checked <60 seconds ago
1062   if (defined $squeue_checked && $squeue_checked > time - 60)
1063   {
1064     return;
1065   }
1066   $squeue_checked = time;
1067
1068   if (!$have_slurm)
1069   {
1070     # here is an opportunity to check for mysterious problems with local procs
1071     return;
1072   }
1073
1074   # get a list of steps still running
1075   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1076   chop @squeue;
1077   if ($squeue[-1] ne "ok")
1078   {
1079     return;
1080   }
1081   pop @squeue;
1082
1083   # which of my jobsteps are running, according to squeue?
1084   my %ok;
1085   foreach (@squeue)
1086   {
1087     if (/^(\d+)\.(\d+) (\S+)/)
1088     {
1089       if ($1 eq $ENV{SLURM_JOBID})
1090       {
1091         $ok{$3} = 1;
1092       }
1093     }
1094   }
1095
1096   # which of my active child procs (>60s old) were not mentioned by squeue?
1097   foreach (keys %proc)
1098   {
1099     if ($proc{$_}->{time} < time - 60
1100         && !exists $ok{$proc{$_}->{jobstepname}}
1101         && !exists $proc{$_}->{killtime})
1102     {
1103       # kill this proc if it hasn't exited in 30 seconds
1104       $proc{$_}->{killtime} = time + 30;
1105     }
1106   }
1107 }
1108
1109
1110 sub release_allocation
1111 {
1112   if ($have_slurm)
1113   {
1114     Log (undef, "release job allocation");
1115     system "scancel $ENV{SLURM_JOBID}";
1116   }
1117 }
1118
1119
1120 sub readfrompipes
1121 {
1122   my $gotsome = 0;
1123   foreach my $job (keys %reader)
1124   {
1125     my $buf;
1126     while (0 < sysread ($reader{$job}, $buf, 8192))
1127     {
1128       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1129       $jobstep[$job]->{stderr} .= $buf;
1130       preprocess_stderr ($job);
1131       if (length ($jobstep[$job]->{stderr}) > 16384)
1132       {
1133         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1134       }
1135       $gotsome = 1;
1136     }
1137   }
1138   return $gotsome;
1139 }
1140
1141
1142 sub preprocess_stderr
1143 {
1144   my $job = shift;
1145
1146   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1147     my $line = $1;
1148     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1149     Log ($job, "stderr $line");
1150     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1151       # whoa.
1152       $main::please_freeze = 1;
1153     }
1154     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1155       $jobstep[$job]->{node_fail} = 1;
1156       ban_node_by_slot($jobstep[$job]->{slotindex});
1157     }
1158   }
1159 }
1160
1161
1162 sub process_stderr
1163 {
1164   my $job = shift;
1165   my $task_success = shift;
1166   preprocess_stderr ($job);
1167
1168   map {
1169     Log ($job, "stderr $_");
1170   } split ("\n", $jobstep[$job]->{stderr});
1171 }
1172
1173 sub fetch_block
1174 {
1175   my $hash = shift;
1176   my ($keep, $child_out, $output_block);
1177
1178   my $cmd = "$arv_cli keep get \Q$hash\E";
1179   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1180   sysread($keep, $output_block, 64 * 1024 * 1024);
1181   close $keep;
1182   return $output_block;
1183 }
1184
1185 sub collate_output
1186 {
1187   Log (undef, "collate");
1188
1189   my ($child_out, $child_in);
1190   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1191   my $joboutput;
1192   for (@jobstep)
1193   {
1194     next if (!exists $_->{'arvados_task'}->{output} ||
1195              !$_->{'arvados_task'}->{'success'} ||
1196              $_->{'exitcode'} != 0);
1197     my $output = $_->{'arvados_task'}->{output};
1198     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1199     {
1200       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1201       print $child_in $output;
1202     }
1203     elsif (@jobstep == 1)
1204     {
1205       $joboutput = $output;
1206       last;
1207     }
1208     elsif (defined (my $outblock = fetch_block ($output)))
1209     {
1210       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1211       print $child_in $outblock;
1212     }
1213     else
1214     {
1215       Log (undef, "XXX fetch_block($output) failed XXX");
1216       $main::success = 0;
1217     }
1218   }
1219   $child_in->close;
1220
1221   if (!defined $joboutput) {
1222     my $s = IO::Select->new($child_out);
1223     if ($s->can_read(120)) {
1224       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1225       chomp($joboutput);
1226     } else {
1227       Log (undef, "timed out reading from 'arv keep put'");
1228     }
1229   }
1230   waitpid($pid, 0);
1231
1232   if ($joboutput)
1233   {
1234     Log (undef, "output $joboutput");
1235     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1236   }
1237   else
1238   {
1239     Log (undef, "output undef");
1240   }
1241   return $joboutput;
1242 }
1243
1244
1245 sub killem
1246 {
1247   foreach (@_)
1248   {
1249     my $sig = 2;                # SIGINT first
1250     if (exists $proc{$_}->{"sent_$sig"} &&
1251         time - $proc{$_}->{"sent_$sig"} > 4)
1252     {
1253       $sig = 15;                # SIGTERM if SIGINT doesn't work
1254     }
1255     if (exists $proc{$_}->{"sent_$sig"} &&
1256         time - $proc{$_}->{"sent_$sig"} > 4)
1257     {
1258       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1259     }
1260     if (!exists $proc{$_}->{"sent_$sig"})
1261     {
1262       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1263       kill $sig, $_;
1264       select (undef, undef, undef, 0.1);
1265       if ($sig == 2)
1266       {
1267         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1268       }
1269       $proc{$_}->{"sent_$sig"} = time;
1270       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1271     }
1272   }
1273 }
1274
1275
1276 sub fhbits
1277 {
1278   my($bits);
1279   for (@_) {
1280     vec($bits,fileno($_),1) = 1;
1281   }
1282   $bits;
1283 }
1284
1285
1286 sub Log                         # ($jobstep_id, $logmessage)
1287 {
1288   if ($_[1] =~ /\n/) {
1289     for my $line (split (/\n/, $_[1])) {
1290       Log ($_[0], $line);
1291     }
1292     return;
1293   }
1294   my $fh = select STDERR; $|=1; select $fh;
1295   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1296   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1297   $message .= "\n";
1298   my $datetime;
1299   if ($local_logfile || -t STDERR) {
1300     my @gmtime = gmtime;
1301     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1302                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1303   }
1304   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1305
1306   if ($local_logfile) {
1307     print $local_logfile $datetime . " " . $message;
1308   }
1309 }
1310
1311
1312 sub croak
1313 {
1314   my ($package, $file, $line) = caller;
1315   my $message = "@_ at $file line $line\n";
1316   Log (undef, $message);
1317   freeze() if @jobstep_todo;
1318   collate_output() if @jobstep_todo;
1319   cleanup();
1320   save_meta() if $local_logfile;
1321   die;
1322 }
1323
1324
1325 sub cleanup
1326 {
1327   return if !$job_has_uuid;
1328   $Job->update_attributes('running' => 0,
1329                           'success' => 0,
1330                           'finished_at' => scalar gmtime);
1331 }
1332
1333
1334 sub save_meta
1335 {
1336   my $justcheckpoint = shift; # false if this will be the last meta saved
1337   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1338
1339   $local_logfile->flush;
1340   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1341       . quotemeta($local_logfile->filename);
1342   my $loglocator = `$cmd`;
1343   die "system $cmd failed: $?" if $?;
1344   chomp($loglocator);
1345
1346   $local_logfile = undef;   # the temp file is automatically deleted
1347   Log (undef, "log manifest is $loglocator");
1348   $Job->{'log'} = $loglocator;
1349   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1350 }
1351
1352
1353 sub freeze_if_want_freeze
1354 {
1355   if ($main::please_freeze)
1356   {
1357     release_allocation();
1358     if (@_)
1359     {
1360       # kill some srun procs before freeze+stop
1361       map { $proc{$_} = {} } @_;
1362       while (%proc)
1363       {
1364         killem (keys %proc);
1365         select (undef, undef, undef, 0.1);
1366         my $died;
1367         while (($died = waitpid (-1, WNOHANG)) > 0)
1368         {
1369           delete $proc{$died};
1370         }
1371       }
1372     }
1373     freeze();
1374     collate_output();
1375     cleanup();
1376     save_meta();
1377     exit 0;
1378   }
1379 }
1380
1381
1382 sub freeze
1383 {
1384   Log (undef, "Freeze not implemented");
1385   return;
1386 }
1387
1388
1389 sub thaw
1390 {
1391   croak ("Thaw not implemented");
1392 }
1393
1394
1395 sub freezequote
1396 {
1397   my $s = shift;
1398   $s =~ s/\\/\\\\/g;
1399   $s =~ s/\n/\\n/g;
1400   return $s;
1401 }
1402
1403
1404 sub freezeunquote
1405 {
1406   my $s = shift;
1407   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1408   return $s;
1409 }
1410
1411
1412 sub srun
1413 {
1414   my $srunargs = shift;
1415   my $execargs = shift;
1416   my $opts = shift || {};
1417   my $stdin = shift;
1418   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1419   print STDERR (join (" ",
1420                       map { / / ? "'$_'" : $_ }
1421                       (@$args)),
1422                 "\n")
1423       if $ENV{CRUNCH_DEBUG};
1424
1425   if (defined $stdin) {
1426     my $child = open STDIN, "-|";
1427     defined $child or die "no fork: $!";
1428     if ($child == 0) {
1429       print $stdin or die $!;
1430       close STDOUT or die $!;
1431       exit 0;
1432     }
1433   }
1434
1435   return system (@$args) if $opts->{fork};
1436
1437   exec @$args;
1438   warn "ENV size is ".length(join(" ",%ENV));
1439   die "exec failed: $!: @$args";
1440 }
1441
1442
1443 sub ban_node_by_slot {
1444   # Don't start any new jobsteps on this node for 60 seconds
1445   my $slotid = shift;
1446   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1447   $slot[$slotid]->{node}->{hold_count}++;
1448   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1449 }
1450
1451 sub must_lock_now
1452 {
1453   my ($lockfile, $error_message) = @_;
1454   open L, ">", $lockfile or croak("$lockfile: $!");
1455   if (!flock L, LOCK_EX|LOCK_NB) {
1456     croak("Can't lock $lockfile: $error_message\n");
1457   }
1458 }
1459
1460 sub find_docker_hash {
1461   # Given a Keep locator, search for a matching link to find the Docker hash
1462   # of the stored image.
1463   my $locator = shift;
1464   my $links_result = $arv->{links}->{list}->execute(
1465     filters => [["head_uuid", "=", $locator],
1466                 ["link_class", "=", "docker_image_hash"]],
1467     limit => 1);
1468   my $docker_hash;
1469   foreach my $link (@{$links_result->{items}}) {
1470     $docker_hash = lc($link->{name});
1471   }
1472   return $docker_hash;
1473 }
1474
1475 __DATA__
1476 #!/usr/bin/perl
1477
1478 # checkout-and-build
1479
1480 use Fcntl ':flock';
1481
1482 my $destdir = $ENV{"CRUNCH_SRC"};
1483 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1484 my $repo = $ENV{"CRUNCH_SRC_URL"};
1485
1486 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1487 flock L, LOCK_EX;
1488 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1489     exit 0;
1490 }
1491
1492 unlink "$destdir.commit";
1493 open STDOUT, ">", "$destdir.log";
1494 open STDERR, ">&STDOUT";
1495
1496 mkdir $destdir;
1497 my @git_archive_data = <DATA>;
1498 if (@git_archive_data) {
1499   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1500   print TARX @git_archive_data;
1501   if(!close(TARX)) {
1502     die "'tar -C $destdir -xf -' exited $?: $!";
1503   }
1504 }
1505
1506 my $pwd;
1507 chomp ($pwd = `pwd`);
1508 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1509 mkdir $install_dir;
1510
1511 for my $src_path ("$destdir/arvados/sdk/python") {
1512   if (-d $src_path) {
1513     shell_or_die ("virtualenv", $install_dir);
1514     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1515   }
1516 }
1517
1518 if (-e "$destdir/crunch_scripts/install") {
1519     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1520 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1521     # Old version
1522     shell_or_die ("./tests/autotests.sh", $install_dir);
1523 } elsif (-e "./install.sh") {
1524     shell_or_die ("./install.sh", $install_dir);
1525 }
1526
1527 if ($commit) {
1528     unlink "$destdir.commit.new";
1529     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1530     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1531 }
1532
1533 close L;
1534
1535 exit 0;
1536
1537 sub shell_or_die
1538 {
1539   if ($ENV{"DEBUG"}) {
1540     print STDERR "@_\n";
1541   }
1542   system (@_) == 0
1543       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1544 }
1545
1546 __DATA__