X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5c1f03ef1ae45d3ac222542d33b3125ce8678d65..55fc92b5f382b964b65c952f047bc673df582d98:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index eff48a21eb..3539a57a03 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -10,12 +10,14 @@ crunch-job: Execute job steps, save snapshots as requested, collate output. Obtain job details from Arvados, run tasks on compute nodes (typically invoked by scheduler on controller): - crunch-job --job x-y-z + crunch-job --job x-y-z --git-dir /path/to/repo/.git Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): - crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}' + crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}' + + crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}' =head1 OPTIONS @@ -27,7 +29,9 @@ If the job is already locked, steal the lock and run it anyway. =item --git-dir -Path to .git directory where the specified commit is found. +Path to a .git directory (or a git URL) where the commit given in the +job's C attribute is to be found. If this is I +given, the job's C attribute will be used. =item --job-api-token @@ -39,6 +43,11 @@ Do not clear per-job/task temporary directories during initial job setup. This can speed up development and debugging when running jobs locally. +=item --job + +UUID of the job to run, or a JSON-encoded job resource without a +UUID. If the latter is given, a new job object will be created. + =back =head1 RUNNING JOBS LOCALLY @@ -77,6 +86,8 @@ use POSIX ':sys_wait_h'; use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; +use Cwd qw(realpath); +use Data::Dumper; use Digest::MD5 qw(md5_hex); use Getopt::Long; use IPC::Open2; @@ -140,43 +151,26 @@ $SIG{'USR2'} = sub my $arv = Arvados->new('apiVersion' => 'v1'); -my $local_logfile; - -my $User = $arv->{'users'}->{'current'}->execute; my $Job; my $job_id; my $dbh; my $sth; +my @jobstep; + +my $User = api_call("users/current"); + if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification - $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + $Job = api_call("jobs/get", uuid => $jobspec); if (!$force_unlock) { - # If some other crunch-job process has grabbed this job (or we see - # other evidence that the job is already underway) we exit - # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't - # mark the job as failed. - if ($Job->{'is_locked_by_uuid'}) { - Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'}); + # Claim this job, and make sure nobody else does + eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; + if ($@) { + Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; - } - if ($Job->{'state'} ne 'Queued') { - Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs."); - exit EX_TEMPFAIL; - } - if ($Job->{'success'} ne undef) { - Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null"); - exit EX_TEMPFAIL; - } - if ($Job->{'running'}) { - Log(undef, "Job 'running' flag is already set"); - exit EX_TEMPFAIL; - } - if ($Job->{'started_at'}) { - Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")"); - exit EX_TEMPFAIL; - } + }; } } else @@ -191,18 +185,29 @@ else $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; $Job->{'started_at'} = gmtime; + $Job->{'state'} = 'Running'; - $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job); + $Job = api_call("jobs/create", job => $Job); } $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; -$local_logfile = File::Temp->new(); +log_writer_start($keep_logfile); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'}; +my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`; +if ($? == 0) { + $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /; + chomp($gem_versions); + chop($gem_versions); # Closing parentheses +} else { + $gem_versions = ""; +} +Log(undef, + "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions); Log (undef, "check slurm allocation"); my @slot; @@ -279,20 +284,11 @@ foreach (@sinfo) @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; - -my $jobmanager_id; -# Claim this job, and make sure nobody else does -unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) && - $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) { - Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL); - exit EX_TEMPFAIL; -} -$Job->update_attributes('state' => 'Running', - 'tasks_summary' => { 'failed' => 0, - 'todo' => 1, - 'running' => 0, - 'done' => 0 }); - +$Job->update_attributes( + 'tasks_summary' => { 'failed' => 0, + 'todo' => 1, + 'running' => 0, + 'done' => 0 }); Log (undef, "start"); $SIG{'INT'} = sub { $main::please_freeze = 1; }; @@ -314,14 +310,12 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; -my @jobstep; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked; my $squeue_kill_checked; -my $output_in_keep = 0; my $latest_refresh = scalar time; @@ -332,12 +326,12 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => { + my $first_task = api_call("job_tasks/create", job_task => { 'job_uuid' => $Job->{'uuid'}, 'sequence' => 0, 'qsequence' => 0, 'parameters' => {}, - }); + }); push @jobstep, { 'level' => 0, 'failures' => 0, 'arvados_task' => $first_task, @@ -351,13 +345,9 @@ if (!$have_slurm) must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); } - -my $build_script; -do { - local $/ = undef; - $build_script = ; -}; +my $build_script = handle_readall(\*DATA); my $nodelist = join(",", @node); +my $git_tar_count = 0; if (!defined $no_clear_tmp) { # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* @@ -367,7 +357,7 @@ if (!defined $no_clear_tmp) { if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -379,20 +369,67 @@ if (!defined $no_clear_tmp) { Log (undef, "Cleanup command exited ".exit_status_s($?)); } +# If this job requires a Docker image, install that. +my $docker_bin = "/usr/bin/docker.io"; +my ($docker_locator, $docker_stream, $docker_hash); +if ($docker_locator = $Job->{docker_image_locator}) { + ($docker_stream, $docker_hash) = find_docker_image($docker_locator); + if (!$docker_hash) + { + croak("No Docker image hash found from locator $docker_locator"); + } + $docker_stream =~ s/^\.//; + my $docker_install_script = qq{ +if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then + arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +fi +}; + my $docker_pid = fork(); + if ($docker_pid == 0) + { + srun (["srun", "--nodelist=" . join(',', @node)], + ["/bin/sh", "-ec", $docker_install_script]); + exit ($?); + } + while (1) + { + last if $docker_pid == waitpid (-1, WNOHANG); + freeze_if_want_freeze ($docker_pid); + select (undef, undef, undef, 0.1); + } + if ($? != 0) + { + croak("Installing Docker image from $docker_locator exited " + .exit_status_s($?)); + } + + if ($Job->{arvados_sdk_version}) { + # The job also specifies an Arvados SDK version. Add the SDKs to the + # tar file for the build script to install. + Log(undef, sprintf("Packing Arvados SDK version %s for installation", + $Job->{arvados_sdk_version})); + add_git_archive("git", "--git-dir=$git_dir", "archive", + "--prefix=.arvados.sdk/", + $Job->{arvados_sdk_version}, "sdk"); + } +} -my $git_archive; if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) { - # If we're in user-land (i.e., not called from crunch-dispatch) - # script_version can be an absolute directory path, signifying we - # should work straight out of that directory instead of using a git - # commit. + # If script_version looks like an absolute path, *and* the --git-dir + # argument was not given -- which implies we were not invoked by + # crunch-dispatch -- we will use the given path as a working + # directory instead of resolving script_version to a git commit (or + # doing anything else with git). $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'}; $ENV{"CRUNCH_SRC"} = $Job->{'script_version'}; } else { + # Resolve the given script_version to a git commit sha1. Also, if + # the repository is remote, clone it into our local filesystem: this + # ensures "git archive" will work, and is necessary to reliably + # resolve a symbolic script_version like "master^". $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; - # Install requested code version Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository}); $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; @@ -413,13 +450,18 @@ else { # to a local dir before doing `git log` et al. my $repo_location; - if ($repo =~ m{://|\@.*:}) { + if ($repo =~ m{://|^[^/]*:}) { # $repo is a git url we can clone, like git:// or https:// or - # file:/// or git@host:repo.git + # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is + # not recognized here because distinguishing that from a local + # path is too fragile. If you really need something strange here, + # use the ssh:// form. $repo_location = 'remote'; } elsif ($repo =~ m{^\.*/}) { # $repo is a local path to a git index. We'll also resolve ../foo - # to ../foo/.git if the latter is a directory. + # to ../foo/.git if the latter is a directory. To help + # disambiguate local paths from named hosted repositories, this + # form must be given as ./ or ../ if it's a relative path. if (-d "$repo/.git") { $repo = "$repo/.git"; } @@ -427,18 +469,18 @@ else { } else { # $repo is none of the above. It must be the name of a hosted # repository. - my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute( - 'filters' => [['name','=',$repo]] - )->{'items'}; - my $n_found = scalar @{$arv_repo_list}; + my $arv_repo_list = api_call("repositories/list", + 'filters' => [['name','=',$repo]]); + my @repos_found = @{$arv_repo_list->{'items'}}; + my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { Log(undef, "Repository '$repo' -> " - . join(", ", map { $_->{'uuid'} } @{$arv_repo_list})); + . join(", ", map { $_->{'uuid'} } @repos_found)); } if ($n_found != 1) { croak("Error: Found $n_found repositories with name '$repo'."); } - $repo = $arv_repo_list->[0]->{'fetch_url'}; + $repo = $repos_found[0]->{'fetch_url'}; $repo_location = 'remote'; } Log(undef, "Using $repo_location repository '$repo'"); @@ -471,11 +513,12 @@ else { # our local cache first, since that's cheaper. (We don't want to # do that with tags/branches though -- those change over time, so # they should always be resolved by the remote repo.) - if ($treeish =~ /^[0-9a-f]{3,40}$/s) { + if ($treeish =~ /^[0-9a-f]{7,40}$/s) { # Hide stderr because it's normal for this to fail: my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`; if ($? == 0 && - $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc! + # Careful not to resolve a branch named abcdeff to commit 1234567: + $sha1 =~ /^$treeish/ && $sha1 =~ /^([0-9a-f]{40})$/s) { $commit = $1; Log(undef, "Commit $commit already present in $local_repo"); @@ -524,12 +567,10 @@ else { } $ENV{"CRUNCH_SRC_COMMIT"} = $commit; - $git_archive = `$gitcmd archive ''\Q$commit\E`; - if ($?) { - croak("Error: $gitcmd archive exited ".exit_status_s($?)); - } + add_git_archive("$gitcmd archive ''\Q$commit\E"); } +my $git_archive = combined_git_archive(); if (!defined $git_archive) { Log(undef, "Skip install phase (no git archive)"); if ($have_slurm) { @@ -545,8 +586,6 @@ else { my @execargs = ("sh", "-c", "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); - # Note: this section is almost certainly unnecessary if we're - # running tasks in docker containers. my $installpid = fork(); if ($installpid == 0) { @@ -559,48 +598,12 @@ else { freeze_if_want_freeze ($installpid); select (undef, undef, undef, 0.1); } - Log (undef, "Install script exited ".exit_status_s($?)); -} - -if (!$have_slurm) -{ - # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above) - must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); -} - -# If this job requires a Docker image, install that. -my $docker_bin = "/usr/bin/docker.io"; -my ($docker_locator, $docker_stream, $docker_hash); -if ($docker_locator = $Job->{docker_image_locator}) { - ($docker_stream, $docker_hash) = find_docker_image($docker_locator); - if (!$docker_hash) - { - croak("No Docker image hash found from locator $docker_locator"); - } - $docker_stream =~ s/^\.//; - my $docker_install_script = qq{ -if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then - arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load -fi -}; - my $docker_pid = fork(); - if ($docker_pid == 0) - { - srun (["srun", "--nodelist=" . join(',', @node)], - ["/bin/sh", "-ec", $docker_install_script]); - exit ($?); - } - while (1) - { - last if $docker_pid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($docker_pid); - select (undef, undef, undef, 0.1); - } - if ($? != 0) - { - croak("Installing Docker image from $docker_locator exited " - .exit_status_s($?)); + my $install_exited = $?; + Log (undef, "Install script exited ".exit_status_s($install_exited)); + foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) { + unlink($tar_filename); } + exit (1) if $install_exited != 0; } foreach (qw (script script_version script_parameters runtime_constraints)) @@ -692,7 +695,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$"; + $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated @@ -707,62 +710,86 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); - my $build_script_to_send = ""; my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " ."&& cd $ENV{CRUNCH_TMP} "; - if ($build_script) - { - $build_script_to_send = $build_script; - $command .= - "&& perl -"; - } $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 "; - $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid "; + my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid"; + $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; + $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy "; + # Dynamically configure the container to use the host system as its # DNS server. Get the host's global addresses from the ip command, # and turn them into docker --dns options using gawk. $command .= q{$(ip -o address show scope global | gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') }; - $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E "; + + # The source tree and $destdir directory (which we have + # installed on the worker host) are available in the container, + # under the same path. + $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E "; + $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E "; + + # Currently, we make arv-mount's mount point appear at /keep + # inside the container (instead of using the same path as the + # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However, + # crunch scripts and utilities must not rely on this. They must + # use $TASK_KEEPMOUNT. $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E "; - $command .= "--env=\QHOME=/home/crunch\E "; + $ENV{TASK_KEEPMOUNT} = "/keep"; + + # TASK_WORK is almost exactly like a docker data volume: it + # starts out empty, is writable, and persists until no + # containers use it any more. We don't use --volumes-from to + # share it with other containers: it is only accessible to this + # task, and it goes away when this task stops. + # + # However, a docker data volume is writable only by root unless + # the mount point already happens to exist in the container with + # different permissions. Therefore, we [1] assume /tmp already + # exists in the image and is writable by the crunch user; [2] + # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be + # writable if they are created by docker while setting up the + # other --volumes); and [3] create $TASK_WORK inside the + # container using $build_script. + $command .= "--volume=/tmp "; + $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname"; + $ENV{"HOME"} = $ENV{"TASK_WORK"}; + $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated + + # TODO: Share a single JOB_WORK volume across all task + # containers on a given worker node, and delete it when the job + # ends (and, in case that doesn't work, when the next job + # starts). + # + # For now, use the same approach as TASK_WORK above. + $ENV{"JOB_WORK"} = "/tmp/crunch-job-work"; + while (my ($env_key, $env_val) = each %ENV) { - if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) { - if ($env_key eq "TASK_WORK") { - $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E "; - } - elsif ($env_key eq "TASK_KEEPMOUNT") { - $command .= "--env=\QTASK_KEEPMOUNT=/keep\E "; - } - else { - $command .= "--env=\Q$env_key=$env_val\E "; - } + if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) { + $command .= "--env=\Q$env_key=$env_val\E "; } } - $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E "; - $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E "; + $command .= "--env=\QHOME=$ENV{HOME}\E "; $command .= "\Q$docker_hash\E "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } my @execargs = ('bash', '-c', $command); - srun (\@srunargs, \@execargs, undef, $build_script_to_send); + srun (\@srunargs, \@execargs, undef, $build_script); # exec() failed, we assume nothing happened. - Log(undef, "srun() failed on build script"); - die; + die "srun() failed on build script\n"; } close("writer"); if (!defined $childpid) @@ -805,7 +832,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { $main::please_info = 0; freeze(); - collate_output(); + create_output_collection(); save_meta(1); update_progress_stats(); } @@ -867,7 +894,7 @@ while (%proc) $main::please_continue = 0; goto THISROUND; } - $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; + $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info; readfrompipes (); if (!reapchildren()) { @@ -904,29 +931,14 @@ goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -my $collated_output = &collate_output(); +my $collated_output = &create_output_collection(); if (!$collated_output) { - Log(undef, "output undef"); + Log (undef, "Failed to write output collection"); } else { - eval { - open(my $orig_manifest, '-|', 'arv-get', $collated_output) - or die "failed to get collated manifest: $!"; - my $orig_manifest_text = ''; - while (my $manifest_line = <$orig_manifest>) { - $orig_manifest_text .= $manifest_line; - } - my $output = $arv->{'collections'}->{'create'}->execute('collection' => { - 'manifest_text' => $orig_manifest_text, - }); - Log(undef, "output uuid " . $output->{uuid}); - Log(undef, "output hash " . $output->{portable_data_hash}); - $Job->update_attributes('output' => $output->{portable_data_hash}); - }; - if ($@) { - Log (undef, "Failed to register output manifest: $@"); - } + Log(undef, "output hash " . $collated_output); + $Job->update_attributes('output' => $collated_output); } Log (undef, "finish"); @@ -1053,7 +1065,8 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = $arv->{'job_tasks'}->{'list'}->execute( + $newtask_results = api_call( + "job_tasks/list", 'where' => { 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} }, @@ -1082,7 +1095,7 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', @@ -1257,14 +1270,24 @@ sub fetch_block return $output_block; } -sub collate_output +# create_output_collections generates a new collection containing the +# output of each successfully completed task, and returns the +# portable_data_hash for the new collection. +# +sub create_output_collection { Log (undef, "collate"); my ($child_out, $child_in); - my $pid = open2($child_out, $child_in, 'arv-put', '--raw', - '--retries', put_retry_count()); - my $joboutput; + my $pid = open2($child_out, $child_in, 'python', '-c', + 'import arvados; ' . + 'import sys; ' . + 'print arvados.api()' . + '.collections()' . + '.create(body={"manifest_text":sys.stdin.read()})' . + '.execute()["portable_data_hash"]' + ); + for (@jobstep) { next if (!exists $_->{'arvados_task'}->{'output'} || @@ -1272,17 +1295,10 @@ sub collate_output my $output = $_->{'arvados_task'}->{output}; if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) { - $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; print $child_in $output; } - elsif (@jobstep == 1) - { - $joboutput = $output; - last; - } elsif (defined (my $outblock = fetch_block ($output))) { - $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; print $child_in $outblock; } else @@ -1293,15 +1309,14 @@ sub collate_output } $child_in->close; - if (!defined $joboutput) { - my $s = IO::Select->new($child_out); - if ($s->can_read(120)) { - sysread($child_out, $joboutput, 64 * 1024 * 1024); - chomp($joboutput); - # TODO: Ensure exit status == 0. - } else { - Log (undef, "timed out reading from 'arv-put'"); - } + my $joboutput; + my $s = IO::Select->new($child_out); + if ($s->can_read(120)) { + sysread($child_out, $joboutput, 64 * 1024 * 1024); + chomp($joboutput); + # TODO: Ensure exit status == 0. + } else { + Log (undef, "timed out while creating output collection"); } # TODO: kill $pid instead of waiting, now that we've decided to # ignore further output. @@ -1352,6 +1367,73 @@ sub fhbits } +# Send log output to Keep via arv-put. +# +# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_pid is the pid of the arv-put subprocess. +# +# The only functions that should access these variables directly are: +# +# log_writer_start($logfilename) +# Starts an arv-put pipe, reading data on stdin and writing it to +# a $logfilename file in an output collection. +# +# log_writer_send($txt) +# Writes $txt to the output log collection. +# +# log_writer_finish() +# Closes the arv-put pipe and returns the output that it produces. +# +# log_writer_is_active() +# Returns a true value if there is currently a live arv-put +# process, false otherwise. +# +my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); + +sub log_writer_start($) +{ + my $logfilename = shift; + $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, + 'arv-put', '--portable-data-hash', + '--retries', '3', + '--filename', $logfilename, + '-'); +} + +sub log_writer_send($) +{ + my $txt = shift; + print $log_pipe_in $txt; +} + +sub log_writer_finish() +{ + return unless $log_pipe_pid; + + close($log_pipe_in); + my $arv_put_output; + + my $s = IO::Select->new($log_pipe_out); + if ($s->can_read(120)) { + sysread($log_pipe_out, $arv_put_output, 1024); + chomp($arv_put_output); + } else { + Log (undef, "timed out reading from 'arv-put'"); + } + + waitpid($log_pipe_pid, 0); + $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; + if ($?) { + Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + } + + return $arv_put_output; +} + +sub log_writer_is_active() { + return $log_pipe_pid; +} + sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { @@ -1365,15 +1447,15 @@ sub Log # ($jobstep_id, $logmessage) $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; - if ($local_logfile || -t STDERR) { + if (log_writer_is_active() || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); - if ($local_logfile) { - print $local_logfile $datetime . " " . $message; + if (log_writer_is_active()) { + log_writer_send($datetime . " " . $message); } } @@ -1384,15 +1466,16 @@ sub croak my $message = "@_ at $file line $line\n"; Log (undef, $message); freeze() if @jobstep_todo; - collate_output() if @jobstep_todo; - cleanup() if $Job; - save_meta() if $local_logfile; + create_output_collection() if @jobstep_todo; + cleanup(); + save_meta(); die; } sub cleanup { + return unless $Job; if ($Job->{'state'} eq 'Cancelled') { $Job->update_attributes('finished_at' => scalar gmtime); } else { @@ -1405,16 +1488,9 @@ sub save_meta { my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm + return unless log_writer_is_active(); - $local_logfile->flush; - my $retry_count = put_retry_count(); - my $cmd = "arv-put --portable-data-hash --retries $retry_count " . - "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename); - my $loglocator = `$cmd`; - die "system $cmd exited ".exit_status_s($?) if $?; - chomp($loglocator); - - $local_logfile = undef; # the temp file is automatically deleted + my $loglocator = log_writer_finish(); Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; $Job->update_attributes('log', $loglocator); @@ -1442,7 +1518,7 @@ sub freeze_if_want_freeze } } freeze(); - collate_output(); + create_output_collection(); cleanup(); save_meta(); exit 1; @@ -1487,11 +1563,13 @@ sub srun my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; - print STDERR (join (" ", - map { / / ? "'$_'" : $_ } - (@$args)), - "\n") - if $ENV{CRUNCH_DEBUG}; + + $Data::Dumper::Terse = 1; + $Data::Dumper::Indent = 0; + my $show_cmd = Dumper($args); + $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; + $show_cmd =~ s/\n/ /g; + warn "starting: $show_cmd\n"; if (defined $stdin) { my $child = open STDIN, "-|"; @@ -1534,7 +1612,8 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) { + my $image = api_call("collections/get", uuid => $locator); + if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); next if (!@tokens); @@ -1555,20 +1634,83 @@ sub find_docker_image { } } -sub put_retry_count { - # Calculate a --retries argument for arv-put that will have it try - # approximately as long as this Job has been running. - my $stoptime = shift || time; - my $starttime = $jobstep[0]->{starttime}; - my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1; - my $retries = 0; - while ($timediff >= 2) { - $retries++; - $timediff /= 2; +sub retry_count { + # Calculate the number of times an operation should be retried, + # assuming exponential backoff, and that we're willing to retry as + # long as tasks have been running. Enforce a minimum of 3 retries. + my ($starttime, $endtime, $timediff, $retries); + if (@jobstep) { + $starttime = $jobstep[0]->{starttime}; + $endtime = $jobstep[-1]->{finishtime}; + } + if (!defined($starttime)) { + $timediff = 0; + } elsif (!defined($endtime)) { + $timediff = time - $starttime; + } else { + $timediff = ($endtime - $starttime) - (time - $endtime); + } + if ($timediff > 0) { + $retries = int(log($timediff) / log(2)); + } else { + $retries = 1; # Use the minimum. } return ($retries > 3) ? $retries : 3; } +sub retry_op { + # Pass in two function references. + # This method will be called with the remaining arguments. + # If it dies, retry it with exponential backoff until it succeeds, + # or until the current retry_count is exhausted. After each failure + # that can be retried, the second function will be called with + # the current try count (0-based), next try time, and error message. + my $operation = shift; + my $retry_callback = shift; + my $retries = retry_count(); + foreach my $try_count (0..$retries) { + my $next_try = time + (2 ** $try_count); + my $result = eval { $operation->(@_); }; + if (!$@) { + return $result; + } elsif ($try_count < $retries) { + $retry_callback->($try_count, $next_try, $@); + my $sleep_time = $next_try - time; + sleep($sleep_time) if ($sleep_time > 0); + } + } + # Ensure the error message ends in a newline, so Perl doesn't add + # retry_op's line number to it. + chomp($@); + die($@ . "\n"); +} + +sub api_call { + # Pass in a /-separated API method name, and arguments for it. + # This function will call that method, retrying as needed until + # the current retry_count is exhausted, with a log on the first failure. + my $method_name = shift; + my $log_api_retry = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); + }; + my $method = $arv; + foreach my $key (split(/\//, $method_name)) { + $method = $method->{$key}; + } + return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); +} + sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". @@ -1583,63 +1725,218 @@ sub exit_status_s { return $s; } +sub handle_readall { + # Pass in a glob reference to a file handle. + # Read all its contents and return them as a string. + my $fh_glob_ref = shift; + local $/ = undef; + return <$fh_glob_ref>; +} + +sub tar_filename_n { + my $n = shift; + return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n); +} + +sub add_git_archive { + # Pass in a git archive command as a string or list, a la system(). + # This method will save its output to be included in the archive sent to the + # build script. + my $git_input; + $git_tar_count++; + if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) { + croak("Failed to save git archive: $!"); + } + my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_); + close($git_input); + waitpid($git_pid, 0); + close(GIT_ARCHIVE); + if ($?) { + croak("Failed to save git archive: git exited " . exit_status_s($?)); + } +} + +sub combined_git_archive { + # Combine all saved tar archives into a single archive, then return its + # contents in a string. Return undef if no archives have been saved. + if ($git_tar_count < 1) { + return undef; + } + my $base_tar_name = tar_filename_n(1); + foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) { + my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append); + if ($tar_exit != 0) { + croak("Error preparing build archive: tar -A exited " . + exit_status_s($tar_exit)); + } + } + if (!open(GIT_TAR, "<", $base_tar_name)) { + croak("Could not open build archive: $!"); + } + my $tar_contents = handle_readall(\*GIT_TAR); + close(GIT_TAR); + return $tar_contents; +} + __DATA__ #!/usr/bin/perl - -# checkout-and-build +# +# This is crunch-job's internal dispatch script. crunch-job running on the API +# server invokes this script on individual compute nodes, or localhost if we're +# running a job locally. It gets called in two modes: +# +# * No arguments: Installation mode. Read a tar archive from the DATA +# file handle; it includes the Crunch script's source code, and +# maybe SDKs as well. Those should be installed in the proper +# locations. This runs outside of any Docker container, so don't try to +# introspect Crunch's runtime environment. +# +# * With arguments: Crunch script run mode. This script should set up the +# environment, then run the command specified in the arguments. This runs +# inside any Docker container. use Fcntl ':flock'; -use File::Path qw( make_path ); +use File::Path qw( make_path remove_tree ); +use POSIX qw(getcwd); + +# Map SDK subdirectories to the path environments they belong to. +my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB"); my $destdir = $ENV{"CRUNCH_SRC"}; my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; +my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt"); +my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; -for my $dir ($destdir, $task_work) { - if ($dir) { - make_path $dir; - -e $dir or die "Failed to create temporary directory ($dir): $!"; +for my $dir ($destdir, $job_work, $task_work) { + if ($dir) { + make_path $dir; + -e $dir or die "Failed to create temporary directory ($dir): $!"; + } +} + +if ($task_work) { + remove_tree($task_work, {keep_root => 1}); +} + +open(STDOUT_ORIG, ">&", STDOUT); +open(STDERR_ORIG, ">&", STDERR); +open(STDOUT, ">>", "$destdir.log"); +open(STDERR, ">&", STDOUT); + +### Crunch script run mode +if (@ARGV) { + # We want to do routine logging during task 0 only. This gives the user + # the information they need, but avoids repeating the information for every + # task. + my $Log; + if ($ENV{TASK_SEQUENCE} eq "0") { + $Log = sub { + my $msg = shift; + printf STDERR_ORIG "[Crunch] $msg\n", @_; + }; + } else { + $Log = sub { }; + } + + my $python_src = "$install_dir/python"; + my $venv_dir = "$job_work/.arvados.venv"; + my $venv_built = -e "$venv_dir/bin/activate"; + if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) { + shell_or_die("virtualenv", "--quiet", "--system-site-packages", + "--python=python2.7", $venv_dir); + shell_or_die("$venv_dir/bin/pip", "--quiet", "install", $python_src); + $venv_built = 1; + $Log->("Built Python SDK virtualenv"); + } + + my $pkgs; + if ($venv_built) { + $Log->("Running in Python SDK virtualenv"); + $pkgs = `(\Q$venv_dir/bin/pip\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; + my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); + @ARGV = ("/bin/sh", "-ec", + ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); + } elsif (-d $python_src) { + $Log->("Warning: virtualenv not found inside Docker container default " + + "\$PATH. Can't install Python SDK."); + } else { + $pkgs = `(pip freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; + } + + if ($pkgs) { + $Log->("Using Arvados SDK:"); + foreach my $line (split /\n/, $pkgs) { + $Log->($line); } + } else { + $Log->("Arvados SDK packages not found"); + } + + while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) { + my $sdk_path = "$install_dir/$sdk_dir"; + if (-d $sdk_path) { + if ($ENV{$sdk_envkey}) { + $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey}; + } else { + $ENV{$sdk_envkey} = $sdk_path; + } + $Log->("Arvados SDK added to %s", $sdk_envkey); + } + } + + close(STDOUT); + close(STDERR); + open(STDOUT, ">&", STDOUT_ORIG); + open(STDERR, ">&", STDERR_ORIG); + exec(@ARGV); + die "Cannot exec `@ARGV`: $!"; } +### Installation mode open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; if (readlink ("$destdir.commit") eq $commit && -d $destdir) { - if (@ARGV) { - exec(@ARGV); - die "Cannot exec `@ARGV`: $!"; - } else { - exit 0; - } + # This version already installed -> nothing to do. + exit(0); } unlink "$destdir.commit"; -open STDOUT, ">", "$destdir.log"; -open STDERR, ">&STDOUT"; - mkdir $destdir; -my @git_archive_data = ; -if (@git_archive_data) { - open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; - print TARX @git_archive_data; - if(!close(TARX)) { - die "'tar -C $destdir -xf -' exited $?: $!"; - } +open TARX, "|-", "tar", "-xC", $destdir; +{ + local $/ = undef; + print TARX ; +} +if(!close(TARX)) { + die "'tar -xC $destdir' exited $?: $!"; } -my $pwd; -chomp ($pwd = `pwd`); -my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; mkdir $install_dir; -for my $src_path ("$destdir/arvados/sdk/python") { - if (-d $src_path) { - shell_or_die ("virtualenv", $install_dir); - shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install"); +my $sdk_root = "$destdir/.arvados.sdk/sdk"; +if (-d $sdk_root) { + foreach my $sdk_lang (("python", + map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) { + if (-d "$sdk_root/$sdk_lang") { + if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) { + die "Failed to install $sdk_lang SDK: $!"; + } + } } } +my $python_dir = "$install_dir/python"; +if ((-d $python_dir) and can_run("python2.7") and + (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) { + # egg_info failed, probably when it asked git for a build tag. + # Specify no build tag. + open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); + print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; + close($pysdk_cfg); +} + if (-e "$destdir/crunch_scripts/install") { shell_or_die ("$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { @@ -1657,11 +1954,12 @@ if ($commit) { close L; -if (@ARGV) { - exec(@ARGV); - die "Cannot exec `@ARGV`: $!"; -} else { - exit 0; +sub can_run { + my $command_name = shift; + open(my $which, "-|", "which", $command_name); + while (<$which>) { } + close($which); + return ($? == 0); } sub shell_or_die @@ -1669,8 +1967,13 @@ sub shell_or_die if ($ENV{"DEBUG"}) { print STDERR "@_\n"; } - system (@_) == 0 - or die "@_ failed: $! exit 0x".sprintf("%x",$?); + if (system (@_) != 0) { + my $err = $!; + my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f); + open STDERR, ">&STDERR_ORIG"; + system ("cat $destdir.log >&2"); + die "@_ failed ($err): $exitstatus"; + } } __DATA__