Merge branch 'master' into 2199-crunch-virtualenv
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
90
91 my $force_unlock;
92 my $git_dir;
93 my $jobspec;
94 my $job_api_token;
95 my $resume_stash;
96 GetOptions('force-unlock' => \$force_unlock,
97            'git-dir=s' => \$git_dir,
98            'job=s' => \$jobspec,
99            'job-api-token=s' => \$job_api_token,
100            'resume-stash=s' => \$resume_stash,
101     );
102
103 if (defined $job_api_token) {
104   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 }
106
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
110
111
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new('apiVersion' => 'v1');
124 my $metastream;
125
126 my $User = $arv->{'users'}->{'current'}->execute;
127
128 my $Job = {};
129 my $job_id;
130 my $dbh;
131 my $sth;
132 if ($job_has_uuid)
133 {
134   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135   if (!$force_unlock) {
136     if ($Job->{'is_locked_by_uuid'}) {
137       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138     }
139     if ($Job->{'success'} ne undef) {
140       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141     }
142     if ($Job->{'running'}) {
143       croak("Job 'running' flag is already set");
144     }
145     if ($Job->{'started_at'}) {
146       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
147     }
148   }
149 }
150 else
151 {
152   $Job = JSON::decode_json($jobspec);
153
154   if (!$resume_stash)
155   {
156     map { croak ("No $_ specified") unless $Job->{$_} }
157     qw(script script_version script_parameters);
158   }
159
160   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161   $Job->{'started_at'} = gmtime;
162
163   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164
165   $job_has_uuid = 1;
166 }
167 $job_id = $Job->{'uuid'};
168
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
170 $metastream->clear;
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
173
174
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
178
179
180 Log (undef, "check slurm allocation");
181 my @slot;
182 my @node;
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my @sinfo;
185 if (!$have_slurm)
186 {
187   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188   push @sinfo, "$localcpus localhost";
189 }
190 if (exists $ENV{SLURM_NODELIST})
191 {
192   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 }
194 foreach (@sinfo)
195 {
196   my ($ncpus, $slurm_nodelist) = split;
197   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198
199   my @nodelist;
200   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201   {
202     my $nodelist = $1;
203     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204     {
205       my $ranges = $1;
206       foreach (split (",", $ranges))
207       {
208         my ($a, $b);
209         if (/(\d+)-(\d+)/)
210         {
211           $a = $1;
212           $b = $2;
213         }
214         else
215         {
216           $a = $_;
217           $b = $_;
218         }
219         push @nodelist, map {
220           my $n = $nodelist;
221           $n =~ s/\[[-,\d]+\]/$_/;
222           $n;
223         } ($a..$b);
224       }
225     }
226     else
227     {
228       push @nodelist, $nodelist;
229     }
230   }
231   foreach my $nodename (@nodelist)
232   {
233     Log (undef, "node $nodename - $ncpus slots");
234     my $node = { name => $nodename,
235                  ncpus => $ncpus,
236                  losing_streak => 0,
237                  hold_until => 0 };
238     foreach my $cpu (1..$ncpus)
239     {
240       push @slot, { node => $node,
241                     cpu => $cpu };
242     }
243   }
244   push @node, @nodelist;
245 }
246
247
248
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
251
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
253
254
255
256 my $jobmanager_id;
257 if ($job_has_uuid)
258 {
259   # Claim this job, and make sure nobody else does
260   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262     croak("Error while updating / locking job");
263   }
264   $Job->update_attributes('started_at' => scalar gmtime,
265                           'running' => 1,
266                           'success' => undef,
267                           'tasks_summary' => { 'failed' => 0,
268                                                'todo' => 1,
269                                                'running' => 0,
270                                                'done' => 0 });
271 }
272
273
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
303
304
305
306 if (defined $Job->{thawedfromkey})
307 {
308   thaw ($Job->{thawedfromkey});
309 }
310 else
311 {
312   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313     'job_uuid' => $Job->{'uuid'},
314     'sequence' => 0,
315     'qsequence' => 0,
316     'parameters' => {},
317                                                           });
318   push @jobstep, { 'level' => 0,
319                    'failures' => 0,
320                    'arvados_task' => $first_task,
321                  };
322   push @jobstep_todo, 0;
323 }
324
325
326 my $build_script;
327
328
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
332 if ($skip_install)
333 {
334   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335   system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
336       or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
337   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python",
338                     "$ENV{CRUNCH_SRC}/sdk/python") {
339     if (-d $src_path) {
340       system ("cd $src_path && \$CRUNCH_TMP/opt/bin/python setup.py install")
341           == 0
342           or croak ("setup.py in $src_path failed: exit ".($?>>8));
343     }
344   }
345 }
346 else
347 {
348   do {
349     local $/ = undef;
350     $build_script = <DATA>;
351   };
352   Log (undef, "Install revision ".$Job->{script_version});
353   my $nodelist = join(",", @node);
354
355   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
361           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
362     exit (1);
363   }
364   while (1)
365   {
366     last if $cleanpid == waitpid (-1, WNOHANG);
367     freeze_if_want_freeze ($cleanpid);
368     select (undef, undef, undef, 0.1);
369   }
370   Log (undef, "Clean-work-dir exited $?");
371
372   # Install requested code version
373
374   my @execargs;
375   my @srunargs = ("srun",
376                   "--nodelist=$nodelist",
377                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
378
379   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
380   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
381
382   my $commit;
383   my $git_archive;
384   my $treeish = $Job->{'script_version'};
385   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
386   # Todo: let script_version specify repository instead of expecting
387   # parent process to figure it out.
388   $ENV{"CRUNCH_SRC_URL"} = $repo;
389
390   # Create/update our clone of the remote git repo
391
392   if (!-d $ENV{"CRUNCH_SRC"}) {
393     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
394         or croak ("git clone $repo failed: exit ".($?>>8));
395     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
396   }
397   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
398
399   # If this looks like a subversion r#, look for it in git-svn commit messages
400
401   if ($treeish =~ m{^\d{1,4}$}) {
402     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
403     chomp $gitlog;
404     if ($gitlog =~ /^[a-f0-9]{40}$/) {
405       $commit = $gitlog;
406       Log (undef, "Using commit $commit for script_version $treeish");
407     }
408   }
409
410   # If that didn't work, try asking git to look it up as a tree-ish.
411
412   if (!defined $commit) {
413
414     my $cooked_treeish = $treeish;
415     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
416       # Looks like a git branch name -- make sure git knows it's
417       # relative to the remote repo
418       $cooked_treeish = "origin/$treeish";
419     }
420
421     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
422     chomp $found;
423     if ($found =~ /^[0-9a-f]{40}$/s) {
424       $commit = $found;
425       if ($commit ne $treeish) {
426         # Make sure we record the real commit id in the database,
427         # frozentokey, logs, etc. -- instead of an abbreviation or a
428         # branch name which can become ambiguous or point to a
429         # different commit in the future.
430         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
431         Log (undef, "Using commit $commit for tree-ish $treeish");
432         if ($commit ne $treeish) {
433           $Job->{'script_version'} = $commit;
434           !$job_has_uuid or
435               $Job->update_attributes('script_version' => $commit) or
436               croak("Error while updating job");
437         }
438       }
439     }
440   }
441
442   if (defined $commit) {
443     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
444     @execargs = ("sh", "-c",
445                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
446     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
447   }
448   else {
449     croak ("could not figure out commit id for $treeish");
450   }
451
452   my $installpid = fork();
453   if ($installpid == 0)
454   {
455     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
456     exit (1);
457   }
458   while (1)
459   {
460     last if $installpid == waitpid (-1, WNOHANG);
461     freeze_if_want_freeze ($installpid);
462     select (undef, undef, undef, 0.1);
463   }
464   Log (undef, "Install exited $?");
465 }
466
467
468
469 foreach (qw (script script_version script_parameters runtime_constraints))
470 {
471   Log (undef,
472        "$_ " .
473        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
474 }
475 foreach (split (/\n/, $Job->{knobs}))
476 {
477   Log (undef, "knob " . $_);
478 }
479
480
481
482 $main::success = undef;
483
484
485
486 ONELEVEL:
487
488 my $thisround_succeeded = 0;
489 my $thisround_failed = 0;
490 my $thisround_failed_multiple = 0;
491
492 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
493                        or $a <=> $b } @jobstep_todo;
494 my $level = $jobstep[$jobstep_todo[0]]->{level};
495 Log (undef, "start level $level");
496
497
498
499 my %proc;
500 my @freeslot = (0..$#slot);
501 my @holdslot;
502 my %reader;
503 my $progress_is_dirty = 1;
504 my $progress_stats_updated = 0;
505
506 update_progress_stats();
507
508
509
510 THISROUND:
511 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
512 {
513   my $id = $jobstep_todo[$todo_ptr];
514   my $Jobstep = $jobstep[$id];
515   if ($Jobstep->{level} != $level)
516   {
517     next;
518   }
519
520   pipe $reader{$id}, "writer" or croak ($!);
521   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
522   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
523
524   my $childslot = $freeslot[0];
525   my $childnode = $slot[$childslot]->{node};
526   my $childslotname = join (".",
527                             $slot[$childslot]->{node}->{name},
528                             $slot[$childslot]->{cpu});
529   my $childpid = fork();
530   if ($childpid == 0)
531   {
532     $SIG{'INT'} = 'DEFAULT';
533     $SIG{'QUIT'} = 'DEFAULT';
534     $SIG{'TERM'} = 'DEFAULT';
535
536     foreach (values (%reader))
537     {
538       close($_);
539     }
540     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
541     open(STDOUT,">&writer");
542     open(STDERR,">&writer");
543
544     undef $dbh;
545     undef $sth;
546
547     delete $ENV{"GNUPGHOME"};
548     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
549     $ENV{"TASK_QSEQUENCE"} = $id;
550     $ENV{"TASK_SEQUENCE"} = $level;
551     $ENV{"JOB_SCRIPT"} = $Job->{script};
552     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
553       $param =~ tr/a-z/A-Z/;
554       $ENV{"JOB_PARAMETER_$param"} = $value;
555     }
556     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
557     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
558     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
559     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
560     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
561     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
562
563     $ENV{"GZIP"} = "-n";
564
565     my @srunargs = (
566       "srun",
567       "--nodelist=".$childnode->{name},
568       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
569       "--job-name=$job_id.$id.$$",
570         );
571     my @execargs = qw(sh);
572     my $build_script_to_send = "";
573     my $command =
574         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
575         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
576         ."&& cd $ENV{CRUNCH_TMP} ";
577     if ($build_script)
578     {
579       $build_script_to_send = $build_script;
580       $command .=
581           "&& perl -";
582     }
583     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
584     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
585     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack
586     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
587     $command .=
588         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
589     my @execargs = ('bash', '-c', $command);
590     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
591     exit (111);
592   }
593   close("writer");
594   if (!defined $childpid)
595   {
596     close $reader{$id};
597     delete $reader{$id};
598     next;
599   }
600   shift @freeslot;
601   $proc{$childpid} = { jobstep => $id,
602                        time => time,
603                        slot => $childslot,
604                        jobstepname => "$job_id.$id.$childpid",
605                      };
606   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
607   $slot[$childslot]->{pid} = $childpid;
608
609   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
610   Log ($id, "child $childpid started on $childslotname");
611   $Jobstep->{starttime} = time;
612   $Jobstep->{node} = $childnode->{name};
613   $Jobstep->{slotindex} = $childslot;
614   delete $Jobstep->{stderr};
615   delete $Jobstep->{finishtime};
616
617   splice @jobstep_todo, $todo_ptr, 1;
618   --$todo_ptr;
619
620   $progress_is_dirty = 1;
621
622   while (!@freeslot
623          ||
624          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
625   {
626     last THISROUND if $main::please_freeze;
627     if ($main::please_info)
628     {
629       $main::please_info = 0;
630       freeze();
631       collate_output();
632       save_meta(1);
633       update_progress_stats();
634     }
635     my $gotsome
636         = readfrompipes ()
637         + reapchildren ();
638     if (!$gotsome)
639     {
640       check_refresh_wanted();
641       check_squeue();
642       update_progress_stats();
643       select (undef, undef, undef, 0.1);
644     }
645     elsif (time - $progress_stats_updated >= 30)
646     {
647       update_progress_stats();
648     }
649     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
650         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
651     {
652       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
653           .($thisround_failed+$thisround_succeeded)
654           .") -- giving up on this round";
655       Log (undef, $message);
656       last THISROUND;
657     }
658
659     # move slots from freeslot to holdslot (or back to freeslot) if necessary
660     for (my $i=$#freeslot; $i>=0; $i--) {
661       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
662         push @holdslot, (splice @freeslot, $i, 1);
663       }
664     }
665     for (my $i=$#holdslot; $i>=0; $i--) {
666       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
667         push @freeslot, (splice @holdslot, $i, 1);
668       }
669     }
670
671     # give up if no nodes are succeeding
672     if (!grep { $_->{node}->{losing_streak} == 0 &&
673                     $_->{node}->{hold_count} < 4 } @slot) {
674       my $message = "Every node has failed -- giving up on this round";
675       Log (undef, $message);
676       last THISROUND;
677     }
678   }
679 }
680
681
682 push @freeslot, splice @holdslot;
683 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
684
685
686 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
687 while (%proc)
688 {
689   if ($main::please_continue) {
690     $main::please_continue = 0;
691     goto THISROUND;
692   }
693   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
694   readfrompipes ();
695   if (!reapchildren())
696   {
697     check_refresh_wanted();
698     check_squeue();
699     update_progress_stats();
700     select (undef, undef, undef, 0.1);
701     killem (keys %proc) if $main::please_freeze;
702   }
703 }
704
705 update_progress_stats();
706 freeze_if_want_freeze();
707
708
709 if (!defined $main::success)
710 {
711   if (@jobstep_todo &&
712       $thisround_succeeded == 0 &&
713       ($thisround_failed == 0 || $thisround_failed > 4))
714   {
715     my $message = "stop because $thisround_failed tasks failed and none succeeded";
716     Log (undef, $message);
717     $main::success = 0;
718   }
719   if (!@jobstep_todo)
720   {
721     $main::success = 1;
722   }
723 }
724
725 goto ONELEVEL if !defined $main::success;
726
727
728 release_allocation();
729 freeze();
730 if ($job_has_uuid) {
731   $Job->update_attributes('output' => &collate_output(),
732                           'running' => 0,
733                           'success' => $Job->{'output'} && $main::success,
734                           'finished_at' => scalar gmtime)
735 }
736
737 if ($Job->{'output'})
738 {
739   eval {
740     my $manifest_text = capturex("whget", $Job->{'output'});
741     $arv->{'collections'}->{'create'}->execute('collection' => {
742       'uuid' => $Job->{'output'},
743       'manifest_text' => $manifest_text,
744     });
745   };
746   if ($@) {
747     Log (undef, "Failed to register output manifest: $@");
748   }
749 }
750
751 Log (undef, "finish");
752
753 save_meta();
754 exit 0;
755
756
757
758 sub update_progress_stats
759 {
760   $progress_stats_updated = time;
761   return if !$progress_is_dirty;
762   my ($todo, $done, $running) = (scalar @jobstep_todo,
763                                  scalar @jobstep_done,
764                                  scalar @slot - scalar @freeslot - scalar @holdslot);
765   $Job->{'tasks_summary'} ||= {};
766   $Job->{'tasks_summary'}->{'todo'} = $todo;
767   $Job->{'tasks_summary'}->{'done'} = $done;
768   $Job->{'tasks_summary'}->{'running'} = $running;
769   if ($job_has_uuid) {
770     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
771   }
772   Log (undef, "status: $done done, $running running, $todo todo");
773   $progress_is_dirty = 0;
774 }
775
776
777
778 sub reapchildren
779 {
780   my $pid = waitpid (-1, WNOHANG);
781   return 0 if $pid <= 0;
782
783   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
784                   . "."
785                   . $slot[$proc{$pid}->{slot}]->{cpu});
786   my $jobstepid = $proc{$pid}->{jobstep};
787   my $elapsed = time - $proc{$pid}->{time};
788   my $Jobstep = $jobstep[$jobstepid];
789
790   my $childstatus = $?;
791   my $exitvalue = $childstatus >> 8;
792   my $exitinfo = sprintf("exit %d signal %d%s",
793                          $exitvalue,
794                          $childstatus & 127,
795                          ($childstatus & 128 ? ' core dump' : ''));
796   $Jobstep->{'arvados_task'}->reload;
797   my $task_success = $Jobstep->{'arvados_task'}->{success};
798
799   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
800
801   if (!defined $task_success) {
802     # task did not indicate one way or the other --> fail
803     $Jobstep->{'arvados_task'}->{success} = 0;
804     $Jobstep->{'arvados_task'}->save;
805     $task_success = 0;
806   }
807
808   if (!$task_success)
809   {
810     my $temporary_fail;
811     $temporary_fail ||= $Jobstep->{node_fail};
812     $temporary_fail ||= ($exitvalue == 111);
813
814     ++$thisround_failed;
815     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
816
817     # Check for signs of a failed or misconfigured node
818     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
819         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
820       # Don't count this against jobstep failure thresholds if this
821       # node is already suspected faulty and srun exited quickly
822       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
823           $elapsed < 5) {
824         Log ($jobstepid, "blaming failure on suspect node " .
825              $slot[$proc{$pid}->{slot}]->{node}->{name});
826         $temporary_fail ||= 1;
827       }
828       ban_node_by_slot($proc{$pid}->{slot});
829     }
830
831     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
832                              ++$Jobstep->{'failures'},
833                              $temporary_fail ? 'temporary ' : 'permanent',
834                              $elapsed));
835
836     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
837       # Give up on this task, and the whole job
838       $main::success = 0;
839       $main::please_freeze = 1;
840     }
841     else {
842       # Put this task back on the todo queue
843       push @jobstep_todo, $jobstepid;
844     }
845     $Job->{'tasks_summary'}->{'failed'}++;
846   }
847   else
848   {
849     ++$thisround_succeeded;
850     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
851     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
852     push @jobstep_done, $jobstepid;
853     Log ($jobstepid, "success in $elapsed seconds");
854   }
855   $Jobstep->{exitcode} = $childstatus;
856   $Jobstep->{finishtime} = time;
857   process_stderr ($jobstepid, $task_success);
858   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
859
860   close $reader{$jobstepid};
861   delete $reader{$jobstepid};
862   delete $slot[$proc{$pid}->{slot}]->{pid};
863   push @freeslot, $proc{$pid}->{slot};
864   delete $proc{$pid};
865
866   # Load new tasks
867   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
868     'where' => {
869       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
870     },
871     'order' => 'qsequence'
872   );
873   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
874     my $jobstep = {
875       'level' => $arvados_task->{'sequence'},
876       'failures' => 0,
877       'arvados_task' => $arvados_task
878     };
879     push @jobstep, $jobstep;
880     push @jobstep_todo, $#jobstep;
881   }
882
883   $progress_is_dirty = 1;
884   1;
885 }
886
887 sub check_refresh_wanted
888 {
889   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
890   if (@stat && $stat[9] > $latest_refresh) {
891     $latest_refresh = scalar time;
892     if ($job_has_uuid) {
893       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
894       for my $attr ('cancelled_at',
895                     'cancelled_by_user_uuid',
896                     'cancelled_by_client_uuid') {
897         $Job->{$attr} = $Job2->{$attr};
898       }
899       if ($Job->{'cancelled_at'}) {
900         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
901              " by user " . $Job->{cancelled_by_user_uuid});
902         $main::success = 0;
903         $main::please_freeze = 1;
904       }
905     }
906   }
907 }
908
909 sub check_squeue
910 {
911   # return if the kill list was checked <4 seconds ago
912   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
913   {
914     return;
915   }
916   $squeue_kill_checked = time;
917
918   # use killem() on procs whose killtime is reached
919   for (keys %proc)
920   {
921     if (exists $proc{$_}->{killtime}
922         && $proc{$_}->{killtime} <= time)
923     {
924       killem ($_);
925     }
926   }
927
928   # return if the squeue was checked <60 seconds ago
929   if (defined $squeue_checked && $squeue_checked > time - 60)
930   {
931     return;
932   }
933   $squeue_checked = time;
934
935   if (!$have_slurm)
936   {
937     # here is an opportunity to check for mysterious problems with local procs
938     return;
939   }
940
941   # get a list of steps still running
942   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
943   chop @squeue;
944   if ($squeue[-1] ne "ok")
945   {
946     return;
947   }
948   pop @squeue;
949
950   # which of my jobsteps are running, according to squeue?
951   my %ok;
952   foreach (@squeue)
953   {
954     if (/^(\d+)\.(\d+) (\S+)/)
955     {
956       if ($1 eq $ENV{SLURM_JOBID})
957       {
958         $ok{$3} = 1;
959       }
960     }
961   }
962
963   # which of my active child procs (>60s old) were not mentioned by squeue?
964   foreach (keys %proc)
965   {
966     if ($proc{$_}->{time} < time - 60
967         && !exists $ok{$proc{$_}->{jobstepname}}
968         && !exists $proc{$_}->{killtime})
969     {
970       # kill this proc if it hasn't exited in 30 seconds
971       $proc{$_}->{killtime} = time + 30;
972     }
973   }
974 }
975
976
977 sub release_allocation
978 {
979   if ($have_slurm)
980   {
981     Log (undef, "release job allocation");
982     system "scancel $ENV{SLURM_JOBID}";
983   }
984 }
985
986
987 sub readfrompipes
988 {
989   my $gotsome = 0;
990   foreach my $job (keys %reader)
991   {
992     my $buf;
993     while (0 < sysread ($reader{$job}, $buf, 8192))
994     {
995       print STDERR $buf if $ENV{CRUNCH_DEBUG};
996       $jobstep[$job]->{stderr} .= $buf;
997       preprocess_stderr ($job);
998       if (length ($jobstep[$job]->{stderr}) > 16384)
999       {
1000         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1001       }
1002       $gotsome = 1;
1003     }
1004   }
1005   return $gotsome;
1006 }
1007
1008
1009 sub preprocess_stderr
1010 {
1011   my $job = shift;
1012
1013   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1014     my $line = $1;
1015     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1016     Log ($job, "stderr $line");
1017     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1018       # whoa.
1019       $main::please_freeze = 1;
1020     }
1021     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1022       $jobstep[$job]->{node_fail} = 1;
1023       ban_node_by_slot($jobstep[$job]->{slotindex});
1024     }
1025   }
1026 }
1027
1028
1029 sub process_stderr
1030 {
1031   my $job = shift;
1032   my $task_success = shift;
1033   preprocess_stderr ($job);
1034
1035   map {
1036     Log ($job, "stderr $_");
1037   } split ("\n", $jobstep[$job]->{stderr});
1038 }
1039
1040
1041 sub collate_output
1042 {
1043   my $whc = Warehouse->new;
1044   Log (undef, "collate");
1045   $whc->write_start (1);
1046   my $joboutput;
1047   for (@jobstep)
1048   {
1049     next if (!exists $_->{'arvados_task'}->{output} ||
1050              !$_->{'arvados_task'}->{'success'} ||
1051              $_->{'exitcode'} != 0);
1052     my $output = $_->{'arvados_task'}->{output};
1053     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1054     {
1055       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1056       $whc->write_data ($output);
1057     }
1058     elsif (@jobstep == 1)
1059     {
1060       $joboutput = $output;
1061       $whc->write_finish;
1062     }
1063     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1064     {
1065       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1066       $whc->write_data ($outblock);
1067     }
1068     else
1069     {
1070       my $errstr = $whc->errstr;
1071       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1072       $main::success = 0;
1073     }
1074   }
1075   $joboutput = $whc->write_finish if !defined $joboutput;
1076   if ($joboutput)
1077   {
1078     Log (undef, "output $joboutput");
1079     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1080   }
1081   else
1082   {
1083     Log (undef, "output undef");
1084   }
1085   return $joboutput;
1086 }
1087
1088
1089 sub killem
1090 {
1091   foreach (@_)
1092   {
1093     my $sig = 2;                # SIGINT first
1094     if (exists $proc{$_}->{"sent_$sig"} &&
1095         time - $proc{$_}->{"sent_$sig"} > 4)
1096     {
1097       $sig = 15;                # SIGTERM if SIGINT doesn't work
1098     }
1099     if (exists $proc{$_}->{"sent_$sig"} &&
1100         time - $proc{$_}->{"sent_$sig"} > 4)
1101     {
1102       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1103     }
1104     if (!exists $proc{$_}->{"sent_$sig"})
1105     {
1106       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1107       kill $sig, $_;
1108       select (undef, undef, undef, 0.1);
1109       if ($sig == 2)
1110       {
1111         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1112       }
1113       $proc{$_}->{"sent_$sig"} = time;
1114       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1115     }
1116   }
1117 }
1118
1119
1120 sub fhbits
1121 {
1122   my($bits);
1123   for (@_) {
1124     vec($bits,fileno($_),1) = 1;
1125   }
1126   $bits;
1127 }
1128
1129
1130 sub Log                         # ($jobstep_id, $logmessage)
1131 {
1132   if ($_[1] =~ /\n/) {
1133     for my $line (split (/\n/, $_[1])) {
1134       Log ($_[0], $line);
1135     }
1136     return;
1137   }
1138   my $fh = select STDERR; $|=1; select $fh;
1139   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1140   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1141   $message .= "\n";
1142   my $datetime;
1143   if ($metastream || -t STDERR) {
1144     my @gmtime = gmtime;
1145     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1146                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1147   }
1148   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1149
1150   return if !$metastream;
1151   $metastream->write_data ($datetime . " " . $message);
1152 }
1153
1154
1155 sub croak
1156 {
1157   my ($package, $file, $line) = caller;
1158   my $message = "@_ at $file line $line\n";
1159   Log (undef, $message);
1160   freeze() if @jobstep_todo;
1161   collate_output() if @jobstep_todo;
1162   cleanup();
1163   save_meta() if $metastream;
1164   die;
1165 }
1166
1167
1168 sub cleanup
1169 {
1170   return if !$job_has_uuid;
1171   $Job->update_attributes('running' => 0,
1172                           'success' => 0,
1173                           'finished_at' => scalar gmtime);
1174 }
1175
1176
1177 sub save_meta
1178 {
1179   my $justcheckpoint = shift; # false if this will be the last meta saved
1180   my $m = $metastream;
1181   $m = $m->copy if $justcheckpoint;
1182   $m->write_finish;
1183   my $whc = Warehouse->new;
1184   my $loglocator = $whc->store_block ($m->as_string);
1185   $arv->{'collections'}->{'create'}->execute('collection' => {
1186     'uuid' => $loglocator,
1187     'manifest_text' => $m->as_string,
1188   });
1189   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1190   Log (undef, "log manifest is $loglocator");
1191   $Job->{'log'} = $loglocator;
1192   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1193 }
1194
1195
1196 sub freeze_if_want_freeze
1197 {
1198   if ($main::please_freeze)
1199   {
1200     release_allocation();
1201     if (@_)
1202     {
1203       # kill some srun procs before freeze+stop
1204       map { $proc{$_} = {} } @_;
1205       while (%proc)
1206       {
1207         killem (keys %proc);
1208         select (undef, undef, undef, 0.1);
1209         my $died;
1210         while (($died = waitpid (-1, WNOHANG)) > 0)
1211         {
1212           delete $proc{$died};
1213         }
1214       }
1215     }
1216     freeze();
1217     collate_output();
1218     cleanup();
1219     save_meta();
1220     exit 0;
1221   }
1222 }
1223
1224
1225 sub freeze
1226 {
1227   Log (undef, "Freeze not implemented");
1228   return;
1229 }
1230
1231
1232 sub thaw
1233 {
1234   croak ("Thaw not implemented");
1235
1236   my $whc;
1237   my $key = shift;
1238   Log (undef, "thaw from $key");
1239
1240   @jobstep = ();
1241   @jobstep_done = ();
1242   @jobstep_todo = ();
1243   @jobstep_tomerge = ();
1244   $jobstep_tomerge_level = 0;
1245   my $frozenjob = {};
1246
1247   my $stream = new Warehouse::Stream ( whc => $whc,
1248                                        hash => [split (",", $key)] );
1249   $stream->rewind;
1250   while (my $dataref = $stream->read_until (undef, "\n\n"))
1251   {
1252     if ($$dataref =~ /^job /)
1253     {
1254       foreach (split ("\n", $$dataref))
1255       {
1256         my ($k, $v) = split ("=", $_, 2);
1257         $frozenjob->{$k} = freezeunquote ($v);
1258       }
1259       next;
1260     }
1261
1262     if ($$dataref =~ /^merge (\d+) (.*)/)
1263     {
1264       $jobstep_tomerge_level = $1;
1265       @jobstep_tomerge
1266           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1267       next;
1268     }
1269
1270     my $Jobstep = { };
1271     foreach (split ("\n", $$dataref))
1272     {
1273       my ($k, $v) = split ("=", $_, 2);
1274       $Jobstep->{$k} = freezeunquote ($v) if $k;
1275     }
1276     $Jobstep->{'failures'} = 0;
1277     push @jobstep, $Jobstep;
1278
1279     if ($Jobstep->{exitcode} eq "0")
1280     {
1281       push @jobstep_done, $#jobstep;
1282     }
1283     else
1284     {
1285       push @jobstep_todo, $#jobstep;
1286     }
1287   }
1288
1289   foreach (qw (script script_version script_parameters))
1290   {
1291     $Job->{$_} = $frozenjob->{$_};
1292   }
1293   $Job->save if $job_has_uuid;
1294 }
1295
1296
1297 sub freezequote
1298 {
1299   my $s = shift;
1300   $s =~ s/\\/\\\\/g;
1301   $s =~ s/\n/\\n/g;
1302   return $s;
1303 }
1304
1305
1306 sub freezeunquote
1307 {
1308   my $s = shift;
1309   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1310   return $s;
1311 }
1312
1313
1314 sub srun
1315 {
1316   my $srunargs = shift;
1317   my $execargs = shift;
1318   my $opts = shift || {};
1319   my $stdin = shift;
1320   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1321   print STDERR (join (" ",
1322                       map { / / ? "'$_'" : $_ }
1323                       (@$args)),
1324                 "\n")
1325       if $ENV{CRUNCH_DEBUG};
1326
1327   if (defined $stdin) {
1328     my $child = open STDIN, "-|";
1329     defined $child or die "no fork: $!";
1330     if ($child == 0) {
1331       print $stdin or die $!;
1332       close STDOUT or die $!;
1333       exit 0;
1334     }
1335   }
1336
1337   return system (@$args) if $opts->{fork};
1338
1339   exec @$args;
1340   warn "ENV size is ".length(join(" ",%ENV));
1341   die "exec failed: $!: @$args";
1342 }
1343
1344
1345 sub ban_node_by_slot {
1346   # Don't start any new jobsteps on this node for 60 seconds
1347   my $slotid = shift;
1348   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1349   $slot[$slotid]->{node}->{hold_count}++;
1350   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1351 }
1352
1353 __DATA__
1354 #!/usr/bin/perl
1355
1356 # checkout-and-build
1357
1358 use Fcntl ':flock';
1359
1360 my $destdir = $ENV{"CRUNCH_SRC"};
1361 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1362 my $repo = $ENV{"CRUNCH_SRC_URL"};
1363
1364 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1365 flock L, LOCK_EX;
1366 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1367     exit 0;
1368 }
1369
1370 unlink "$destdir.commit";
1371 open STDOUT, ">", "$destdir.log";
1372 open STDERR, ">&STDOUT";
1373
1374 mkdir $destdir;
1375 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1376 print TARX <DATA>;
1377 if(!close(TARX)) {
1378   die "'tar -C $destdir -xf -' exited $?: $!";
1379 }
1380
1381 my $pwd;
1382 chomp ($pwd = `pwd`);
1383 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1384 mkdir $install_dir;
1385
1386 shell_or_die ("virtualenv", $install_dir);
1387 for my $src_path ("$destdir/arvados/sdk/python", "$destdir/sdk/python") {
1388   if (-d $src_path) {
1389     shell_or_die ("cd $src_path && $install_dir/bin/python setup.py install");
1390   }
1391 }
1392
1393 if (-e "$destdir/crunch_scripts/install") {
1394     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1395 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1396     # Old version
1397     shell_or_die ("./tests/autotests.sh", $install_dir);
1398 } elsif (-e "./install.sh") {
1399     shell_or_die ("./install.sh", $install_dir);
1400 }
1401
1402 if ($commit) {
1403     unlink "$destdir.commit.new";
1404     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1405     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1406 }
1407
1408 close L;
1409
1410 exit 0;
1411
1412 sub shell_or_die
1413 {
1414   if ($ENV{"DEBUG"}) {
1415     print STDERR "@_\n";
1416   }
1417   system (@_) == 0
1418       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1419 }
1420
1421 __DATA__