X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/41c7f2010ebdbb76fada25a21f184e2d1f4049b3..dd7bb176c565d0d0718f9b0e59a6d9ee4b8ecbf2:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 0ba32b0588..369bc3e1ae 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output. Obtain job details from Arvados, run tasks on compute nodes (typically invoked by scheduler on controller): - crunch-job --job x-y-z + crunch-job --job x-y-z --git-dir /path/to/repo/.git Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): - crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}' + crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}' + + crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}' =head1 OPTIONS @@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway. =item --git-dir -Path to .git directory where the specified commit is found. +Path to a .git directory (or a git URL) where the commit given in the +job's C attribute is to be found. If this is I +given, the job's C attribute will be used. =item --job-api-token @@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job setup. This can speed up development and debugging when running jobs locally. +=item --job + +UUID of the job to run, or a JSON-encoded job resource without a +UUID. If the latter is given, a new job object will be created. + =back =head1 RUNNING JOBS LOCALLY @@ -74,6 +83,7 @@ behavior (e.g., cancel job if cancelled_at becomes non-nil). use strict; use POSIX ':sys_wait_h'; +use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; use Digest::MD5 qw(md5_hex); @@ -82,7 +92,7 @@ use IPC::Open2; use IO::Select; use File::Temp; use Fcntl ':flock'; -use File::Path qw( make_path ); +use File::Path qw( make_path remove_tree ); use constant EX_TEMPFAIL => 75; @@ -124,8 +134,7 @@ if (defined $job_api_token) { } my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; -my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/; -my $local_job = !$job_has_uuid; +my $local_job = 0; $SIG{'USR1'} = sub @@ -140,38 +149,31 @@ $SIG{'USR2'} = sub my $arv = Arvados->new('apiVersion' => 'v1'); -my $local_logfile; - -my $User = $arv->{'users'}->{'current'}->execute; -my $Job = {}; +my $Job; my $job_id; my $dbh; my $sth; -if ($job_has_uuid) +my @jobstep; + +my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); + +if ($jobspec =~ /^[-a-z\d]+$/) { - $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + # $jobspec is an Arvados UUID, not a JSON job specification + $Job = retry_op(sub { + $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + }); if (!$force_unlock) { - # If some other crunch-job process has grabbed this job (or we see - # other evidence that the job is already underway) we exit - # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't - # mark the job as failed. - if ($Job->{'is_locked_by_uuid'}) { - Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'}); + # Claim this job, and make sure nobody else does + eval { retry_op(sub { + # lock() sets is_locked_by_uuid and changes state to Running. + $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) + }); }; + if ($@) { + Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; - } - if ($Job->{'success'} ne undef) { - Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null"); - exit EX_TEMPFAIL; - } - if ($Job->{'running'}) { - Log(undef, "Job 'running' flag is already set"); - exit EX_TEMPFAIL; - } - if ($Job->{'started_at'}) { - Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")"); - exit EX_TEMPFAIL; - } + }; } } else @@ -186,15 +188,14 @@ else $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; $Job->{'started_at'} = gmtime; + $Job->{'state'} = 'Running'; - $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job); - - $job_has_uuid = 1; + $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); } $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; -$local_logfile = File::Temp->new(); +log_writer_start($keep_logfile); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; @@ -276,25 +277,11 @@ foreach (@sinfo) @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; - -my $jobmanager_id; -if ($job_has_uuid) -{ - # Claim this job, and make sure nobody else does - unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) && - $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) { - Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL); - exit EX_TEMPFAIL; - } - $Job->update_attributes('started_at' => scalar gmtime, - 'running' => 1, - 'success' => undef, - 'tasks_summary' => { 'failed' => 0, - 'todo' => 1, - 'running' => 0, - 'done' => 0 }); -} - +$Job->update_attributes( + 'tasks_summary' => { 'failed' => 0, + 'todo' => 1, + 'running' => 0, + 'done' => 0 }); Log (undef, "start"); $SIG{'INT'} = sub { $main::please_freeze = 1; }; @@ -316,7 +303,6 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; -my @jobstep; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); @@ -334,12 +320,14 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); + my $first_task = retry_op(sub { + $arv->{'job_tasks'}->{'create'}->execute('job_task' => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, + }); + }); push @jobstep, { 'level' => 0, 'failures' => 0, 'arvados_task' => $first_task, @@ -355,137 +343,209 @@ if (!$have_slurm) my $build_script; +do { + local $/ = undef; + $build_script = ; +}; +my $nodelist = join(",", @node); +if (!defined $no_clear_tmp) { + # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* + Log (undef, "Clean work dirs"); -$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; - -my $skip_install = ($local_job && $Job->{script_version} =~ m{^/}); -if ($skip_install) -{ - if (!defined $no_clear_tmp) { - my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*'; - system($clear_tmp_cmd) == 0 - or croak ("`$clear_tmp_cmd` failed: ".($?>>8)); - } - $ENV{"CRUNCH_SRC"} = $Job->{script_version}; - for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") { - if (-d $src_path) { - system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0 - or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8)); - system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install") - == 0 - or croak ("setup.py in $src_path failed: exit ".($?>>8)); - } + my $cleanpid = fork(); + if ($cleanpid == 0) + { + srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); + exit (1); + } + while (1) + { + last if $cleanpid == waitpid (-1, WNOHANG); + freeze_if_want_freeze ($cleanpid); + select (undef, undef, undef, 0.1); } + Log (undef, "Cleanup command exited ".exit_status_s($?)); } -else -{ - do { - local $/ = undef; - $build_script = ; - }; - Log (undef, "Install revision ".$Job->{script_version}); - my $nodelist = join(",", @node); - - if (!defined $no_clear_tmp) { - # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* - my $cleanpid = fork(); - if ($cleanpid == 0) - { - srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); - exit (1); - } - while (1) - { - last if $cleanpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($cleanpid); - select (undef, undef, undef, 0.1); - } - Log (undef, "Clean-work-dir exited $?"); - } - # Install requested code version +my $git_archive; +if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) { + # If script_version looks like an absolute path, *and* the --git-dir + # argument was not given -- which implies we were not invoked by + # crunch-dispatch -- we will use the given path as a working + # directory instead of resolving script_version to a git commit (or + # doing anything else with git). + $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'}; + $ENV{"CRUNCH_SRC"} = $Job->{'script_version'}; +} +else { + # Resolve the given script_version to a git commit sha1. Also, if + # the repository is remote, clone it into our local filesystem: this + # ensures "git archive" will work, and is necessary to reliably + # resolve a symbolic script_version like "master^". + $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; - my @execargs; - my @srunargs = ("srun", - "--nodelist=$nodelist", - "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); + Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository}); $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; - $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; - - my $commit; - my $git_archive; - my $treeish = $Job->{'script_version'}; - # If we're running under crunch-dispatch, it will have pulled the - # appropriate source tree into its own repository, and given us that - # repo's path as $git_dir. If we're running a "local" job, and a - # script_version was specified, it's up to the user to provide the - # full path to a local repository in Job->{repository}. + # If we're running under crunch-dispatch, it will have already + # pulled the appropriate source tree into its own repository, and + # given us that repo's path as $git_dir. # - # TODO: Accept URLs too, not just local paths. Use git-ls-remote and - # git-archive --remote where appropriate. + # If we're running a "local" job, we might have to fetch content + # from a remote repository. # - # TODO: Accept a locally-hosted Arvados repository by name or - # UUID. Use arvados.v1.repositories.list or .get to figure out the - # appropriate fetch-url. - my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'}; - + # (Currently crunch-dispatch gives a local path with --git-dir, but + # we might as well accept URLs there too in case it changes its + # mind.) + my $repo = $git_dir || $Job->{'repository'}; + + # Repository can be remote or local. If remote, we'll need to fetch it + # to a local dir before doing `git log` et al. + my $repo_location; + + if ($repo =~ m{://|^[^/]*:}) { + # $repo is a git url we can clone, like git:// or https:// or + # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is + # not recognized here because distinguishing that from a local + # path is too fragile. If you really need something strange here, + # use the ssh:// form. + $repo_location = 'remote'; + } elsif ($repo =~ m{^\.*/}) { + # $repo is a local path to a git index. We'll also resolve ../foo + # to ../foo/.git if the latter is a directory. To help + # disambiguate local paths from named hosted repositories, this + # form must be given as ./ or ../ if it's a relative path. + if (-d "$repo/.git") { + $repo = "$repo/.git"; + } + $repo_location = 'local'; + } else { + # $repo is none of the above. It must be the name of a hosted + # repository. + my $arv_repo_list = retry_op(sub { + $arv->{'repositories'}->{'list'}->execute( + 'filters' => [['name','=',$repo]]); + }); + my @repos_found = @{$arv_repo_list->{'items'}}; + my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; + if ($n_found > 0) { + Log(undef, "Repository '$repo' -> " + . join(", ", map { $_->{'uuid'} } @repos_found)); + } + if ($n_found != 1) { + croak("Error: Found $n_found repositories with name '$repo'."); + } + $repo = $repos_found[0]->{'fetch_url'}; + $repo_location = 'remote'; + } + Log(undef, "Using $repo_location repository '$repo'"); $ENV{"CRUNCH_SRC_URL"} = $repo; - if (-d "$repo/.git") { - # We were given a working directory, but we are only interested in - # the index. - $repo = "$repo/.git"; - } + # Resolve given script_version (we'll call that $treeish here) to a + # commit sha1 ($commit). + my $treeish = $Job->{'script_version'}; + my $commit; + if ($repo_location eq 'remote') { + # We minimize excess object-fetching by re-using the same bare + # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we + # just keep adding remotes to it as needed. + my $local_repo = $ENV{'CRUNCH_TMP'}."/.git"; + my $gitcmd = "git --git-dir=\Q$local_repo\E"; + + # Set up our local repo for caching remote objects, making + # archives, etc. + if (!-d $local_repo) { + make_path($local_repo) or croak("Error: could not create $local_repo"); + } + # This works (exits 0 and doesn't delete fetched objects) even + # if $local_repo is already initialized: + `$gitcmd init --bare`; + if ($?) { + croak("Error: $gitcmd init --bare exited ".exit_status_s($?)); + } + + # If $treeish looks like a hash (or abbrev hash) we look it up in + # our local cache first, since that's cheaper. (We don't want to + # do that with tags/branches though -- those change over time, so + # they should always be resolved by the remote repo.) + if ($treeish =~ /^[0-9a-f]{7,40}$/s) { + # Hide stderr because it's normal for this to fail: + my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`; + if ($? == 0 && + # Careful not to resolve a branch named abcdeff to commit 1234567: + $sha1 =~ /^$treeish/ && + $sha1 =~ /^([0-9a-f]{40})$/s) { + $commit = $1; + Log(undef, "Commit $commit already present in $local_repo"); + } + } + + if (!defined $commit) { + # If $treeish isn't just a hash or abbrev hash, or isn't here + # yet, we need to fetch the remote to resolve it correctly. - # If this looks like a subversion r#, look for it in git-svn commit messages + # First, remove all local heads. This prevents a name that does + # not exist on the remote from resolving to (or colliding with) + # a previously fetched branch or tag (possibly from a different + # remote). + remove_tree("$local_repo/refs/heads", {keep_root => 1}); - if ($treeish =~ m{^\d{1,4}$}) { - my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`; - chomp $gitlog; - if ($gitlog =~ /^[a-f0-9]{40}$/) { - $commit = $gitlog; - Log (undef, "Using commit $commit for script_version $treeish"); + Log(undef, "Fetching objects from $repo to $local_repo"); + `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`; + if ($?) { + croak("Error: `$gitcmd fetch` exited ".exit_status_s($?)); + } } + + # Now that the data is all here, we will use our local repo for + # the rest of our git activities. + $repo = $local_repo; } - # If that didn't work, try asking git to look it up as a tree-ish. - - if (!defined $commit) { - my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`; - chomp $found; - if ($found =~ /^[0-9a-f]{40}$/s) { - $commit = $found; - if ($commit ne $treeish) { - # Make sure we record the real commit id in the database, - # frozentokey, logs, etc. -- instead of an abbreviation or a - # branch name which can become ambiguous or point to a - # different commit in the future. - $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - Log (undef, "Using commit $commit for tree-ish $treeish"); - if ($commit ne $treeish) { - $Job->{'script_version'} = $commit; - !$job_has_uuid or - $Job->update_attributes('script_version' => $commit) or - croak("Error while updating job"); - } - } + my $gitcmd = "git --git-dir=\Q$repo\E"; + my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`; + unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) { + croak("`$gitcmd rev-list` exited " + .exit_status_s($?) + .", '$treeish' not found. Giving up."); + } + $commit = $1; + Log(undef, "Version $treeish is commit $commit"); + + if ($commit ne $Job->{'script_version'}) { + # Record the real commit id in the database, frozentokey, logs, + # etc. -- instead of an abbreviation or a branch name which can + # become ambiguous or point to a different commit in the future. + if (!$Job->update_attributes('script_version' => $commit)) { + croak("Error: failed to update job's script_version attribute"); } } - if (defined $commit) { - $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - @execargs = ("sh", "-c", - "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); - $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`; + $ENV{"CRUNCH_SRC_COMMIT"} = $commit; + $git_archive = `$gitcmd archive ''\Q$commit\E`; + if ($?) { + croak("Error: $gitcmd archive exited ".exit_status_s($?)); } - else { - croak ("could not figure out commit id for $treeish"); +} + +if (!defined $git_archive) { + Log(undef, "Skip install phase (no git archive)"); + if ($have_slurm) { + Log(undef, "Warning: This probably means workers have no source tree!"); } +} +else { + Log(undef, "Run install script on all workers"); + + my @srunargs = ("srun", + "--nodelist=$nodelist", + "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); + my @execargs = ("sh", "-c", + "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); # Note: this section is almost certainly unnecessary if we're # running tasks in docker containers. @@ -501,7 +561,7 @@ else freeze_if_want_freeze ($installpid); select (undef, undef, undef, 0.1); } - Log (undef, "Install exited $?"); + Log (undef, "Install script exited ".exit_status_s($?)); } if (!$have_slurm) @@ -540,7 +600,8 @@ fi } if ($? != 0) { - croak("Installing Docker image from $docker_locator returned exit code $?"); + croak("Installing Docker image from $docker_locator exited " + .exit_status_s($?)); } } @@ -659,7 +720,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "&& perl -"; } - $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec "; + $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 "; @@ -729,6 +790,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) delete $Jobstep->{stderr}; delete $Jobstep->{finishtime}; + $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); + $Jobstep->{'arvados_task'}->save; + splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; @@ -844,12 +908,6 @@ release_allocation(); freeze(); my $collated_output = &collate_output(); -if ($job_has_uuid) { - $Job->update_attributes('running' => 0, - 'success' => $collated_output && $main::success, - 'finished_at' => scalar gmtime) -} - if (!$collated_output) { Log(undef, "output undef"); } @@ -861,12 +919,13 @@ else { while (my $manifest_line = <$orig_manifest>) { $orig_manifest_text .= $manifest_line; } - my $output = $arv->{'collections'}->{'create'}->execute('collection' => { - 'manifest_text' => $orig_manifest_text, + my $output = retry_op(sub { + $arv->{'collections'}->{'create'}->execute( + 'collection' => {'manifest_text' => $orig_manifest_text}); }); Log(undef, "output uuid " . $output->{uuid}); Log(undef, "output hash " . $output->{portable_data_hash}); - $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid; + $Job->update_attributes('output' => $output->{portable_data_hash}); }; if ($@) { Log (undef, "Failed to register output manifest: $@"); @@ -876,7 +935,16 @@ else { Log (undef, "finish"); save_meta(); -exit ($Job->{'success'} ? 1 : 0); + +my $final_state; +if ($collated_output && $main::success) { + $final_state = 'Complete'; +} else { + $final_state = 'Failed'; +} +$Job->update_attributes('state' => $final_state); + +exit (($final_state eq 'Complete') ? 0 : 1); @@ -891,9 +959,7 @@ sub update_progress_stats $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; $Job->{'tasks_summary'}->{'running'} = $running; - if ($job_has_uuid) { - $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'}); - } + $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'}); Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } @@ -914,10 +980,7 @@ sub reapchildren my $childstatus = $?; my $exitvalue = $childstatus >> 8; - my $exitinfo = sprintf("exit %d signal %d%s", - $exitvalue, - $childstatus & 127, - ($childstatus & 128 ? ' core dump' : '')); + my $exitinfo = "exit ".exit_status_s($childstatus); $Jobstep->{'arvados_task'}->reload; my $task_success = $Jobstep->{'arvados_task'}->{success}; @@ -963,10 +1026,8 @@ sub reapchildren $main::success = 0; $main::please_freeze = 1; } - else { - # Put this task back on the todo queue - push @jobstep_todo, $jobstepid; - } + # Put this task back on the todo queue + push @jobstep_todo, $jobstepid; $Job->{'tasks_summary'}->{'failed'}++; } else @@ -979,6 +1040,8 @@ sub reapchildren } $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; + $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); + $Jobstep->{'arvados_task'}->save; process_stderr ($jobstepid, $task_success); Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); @@ -993,13 +1056,15 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); + $newtask_results = retry_op(sub { + $arv->{'job_tasks'}->{'list'}->execute( + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); + }); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { @@ -1022,19 +1087,23 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - if ($job_has_uuid) { - my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - for my $attr ('cancelled_at', - 'cancelled_by_user_uuid', - 'cancelled_by_client_uuid') { - $Job->{$attr} = $Job2->{$attr}; - } - if ($Job->{'cancelled_at'}) { - Log (undef, "Job cancelled at " . $Job->{cancelled_at} . - " by user " . $Job->{cancelled_by_user_uuid}); - $main::success = 0; - $main::please_freeze = 1; + my $Job2 = retry_op(sub { + $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + }); + for my $attr ('cancelled_at', + 'cancelled_by_user_uuid', + 'cancelled_by_client_uuid', + 'state') { + $Job->{$attr} = $Job2->{$attr}; + } + if ($Job->{'state'} ne "Running") { + if ($Job->{'state'} eq "Cancelled") { + Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'}); + } else { + Log (undef, "Job state unexpectedly changed to " . $Job->{'state'}); } + $main::success = 0; + $main::please_freeze = 1; } } } @@ -1200,7 +1269,8 @@ sub collate_output Log (undef, "collate"); my ($child_out, $child_in); - my $pid = open2($child_out, $child_in, 'arv-put', '--raw'); + my $pid = open2($child_out, $child_in, 'arv-put', '--raw', + '--retries', retry_count()); my $joboutput; for (@jobstep) { @@ -1235,10 +1305,13 @@ sub collate_output if ($s->can_read(120)) { sysread($child_out, $joboutput, 64 * 1024 * 1024); chomp($joboutput); + # TODO: Ensure exit status == 0. } else { Log (undef, "timed out reading from 'arv-put'"); } } + # TODO: kill $pid instead of waiting, now that we've decided to + # ignore further output. waitpid($pid, 0); return $joboutput; @@ -1286,6 +1359,73 @@ sub fhbits } +# Send log output to Keep via arv-put. +# +# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_pid is the pid of the arv-put subprocess. +# +# The only functions that should access these variables directly are: +# +# log_writer_start($logfilename) +# Starts an arv-put pipe, reading data on stdin and writing it to +# a $logfilename file in an output collection. +# +# log_writer_send($txt) +# Writes $txt to the output log collection. +# +# log_writer_finish() +# Closes the arv-put pipe and returns the output that it produces. +# +# log_writer_is_active() +# Returns a true value if there is currently a live arv-put +# process, false otherwise. +# +my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); + +sub log_writer_start($) +{ + my $logfilename = shift; + $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, + 'arv-put', '--portable-data-hash', + '--retries', '3', + '--filename', $logfilename, + '-'); +} + +sub log_writer_send($) +{ + my $txt = shift; + print $log_pipe_in $txt; +} + +sub log_writer_finish() +{ + return unless $log_pipe_pid; + + close($log_pipe_in); + my $arv_put_output; + + my $s = IO::Select->new($log_pipe_out); + if ($s->can_read(120)) { + sysread($log_pipe_out, $arv_put_output, 1024); + chomp($arv_put_output); + } else { + Log (undef, "timed out reading from 'arv-put'"); + } + + waitpid($log_pipe_pid, 0); + $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; + if ($?) { + Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + } + + return $arv_put_output; +} + +sub log_writer_is_active() { + return $log_pipe_pid; +} + sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { @@ -1299,15 +1439,15 @@ sub Log # ($jobstep_id, $logmessage) $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; - if ($local_logfile || -t STDERR) { + if (log_writer_is_active() || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); - if ($local_logfile) { - print $local_logfile $datetime . " " . $message; + if (log_writer_is_active()) { + log_writer_send($datetime . " " . $message); } } @@ -1320,17 +1460,19 @@ sub croak freeze() if @jobstep_todo; collate_output() if @jobstep_todo; cleanup(); - save_meta() if $local_logfile; + save_meta(); die; } sub cleanup { - return if !$job_has_uuid; - $Job->update_attributes('running' => 0, - 'success' => 0, - 'finished_at' => scalar gmtime); + return unless $Job; + if ($Job->{'state'} eq 'Cancelled') { + $Job->update_attributes('finished_at' => scalar gmtime); + } else { + $Job->update_attributes('state' => 'Failed'); + } } @@ -1338,18 +1480,12 @@ sub save_meta { my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm + return unless log_writer_is_active(); - $local_logfile->flush; - my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E " - . quotemeta($local_logfile->filename); - my $loglocator = `$cmd`; - die "system $cmd failed: $?" if $?; - chomp($loglocator); - - $local_logfile = undef; # the temp file is automatically deleted + my $loglocator = log_writer_finish(); Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; - $Job->update_attributes('log', $loglocator) if $job_has_uuid; + $Job->update_attributes('log', $loglocator); } @@ -1377,7 +1513,7 @@ sub freeze_if_want_freeze collate_output(); cleanup(); save_meta(); - exit 0; + exit 1; } } @@ -1466,7 +1602,10 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) { + my $image = retry_op(sub { + $arv->{collections}->{get}->execute(uuid => $locator); + }); + if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); next if (!@tokens); @@ -1487,6 +1626,66 @@ sub find_docker_image { } } +sub retry_count { + # Calculate the number of times an operation should be retried, + # assuming exponential backoff, and that we're willing to retry as + # long as tasks have been running. Enforce a minimum of 3 retries. + my ($starttime, $endtime, $timediff, $retries); + if (@jobstep) { + $starttime = $jobstep[0]->{starttime}; + $endtime = $jobstep[-1]->{finishtime}; + } + if (!defined($starttime)) { + $timediff = 0; + } elsif (!defined($endtime)) { + $timediff = time - $starttime; + } else { + $timediff = ($endtime - $starttime) - (time - $endtime); + } + if ($timediff > 0) { + $retries = int(log($timediff) / log(2)); + } else { + $retries = 1; # Use the minimum. + } + return ($retries > 3) ? $retries : 3; +} + +sub retry_op { + # Given a function reference, call it with the remaining arguments. If + # it dies, retry it with exponential backoff until it succeeds, or until + # the current retry_count is exhausted. + my $operation = shift; + my $retries = retry_count(); + foreach my $try_count (0..$retries) { + my $next_try = time + (2 ** $try_count); + my $result = eval { $operation->(@_); }; + if (!$@) { + return $result; + } elsif ($try_count < $retries) { + my $sleep_time = $next_try - time; + sleep($sleep_time) if ($sleep_time > 0); + } + } + # Ensure the error message ends in a newline, so Perl doesn't add + # retry_op's line number to it. + chomp($@); + die($@ . "\n"); +} + +sub exit_status_s { + # Given a $?, return a human-readable exit code string like "0" or + # "1" or "0 with signal 1" or "1 with signal 11". + my $exitcode = shift; + my $s = $exitcode >> 8; + if ($exitcode & 0x7f) { + $s .= " with signal " . ($exitcode & 0x7f); + } + if ($exitcode & 0x80) { + $s .= " with core dump"; + } + return $s; +} + __DATA__ #!/usr/bin/perl