15133: Delete crunch-job & arv-run-pipeline-instance
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 8 Aug 2019 14:04:43 +0000 (10:04 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 8 Aug 2019 14:04:43 +0000 (10:04 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cli/bin/arv-crunch-job [deleted file]
sdk/cli/bin/arv-run-pipeline-instance [deleted file]
sdk/cli/bin/crunch-job [deleted file]

diff --git a/sdk/cli/bin/arv-crunch-job b/sdk/cli/bin/arv-crunch-job
deleted file mode 100755 (executable)
index 6e4b5e0..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-exec File.join(File.dirname(File.realpath(__FILE__)), 'crunch-job'), *ARGV
diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance
deleted file mode 100755 (executable)
index 336b1a2..0000000
+++ /dev/null
@@ -1,781 +0,0 @@
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-class WhRunPipelineInstance
-if RUBY_VERSION < '1.9.3' then
-  abort <<-EOS
-#{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
-  EOS
-  require 'arvados'
-  require 'rubygems'
-  require 'json'
-  require 'pp'
-  require 'optimist'
-  require 'google/api_client'
-rescue LoadError => l
-  $stderr.puts $:
-  abort <<-EOS
-#{$0}: fatal: #{l.message}
-Some runtime dependencies may be missing.
-Try: gem install arvados pp google-api-client json optimist
-  EOS
-def debuglog(message, verbosity=1)
-  $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
-# Parse command line options (the kind that control the behavior of
-# this program, that is, not the pipeline component parameters).
-p = Optimist::Parser.new do
-  version __FILE__
-  banner(<<EOF)
-  arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
-  arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
-  param_name=param_value
-  param_name param_value
-                         Set (or override) the default value for every
-                         pipeline component parameter with the given
-                         name.
-  component_name::param_name=param_value
-  component_name::param_name param_value
-  --component_name::param_name=param_value
-  --component_name::param_name param_value
-                         Set the value of a parameter for a single
-                         pipeline component.
-  opt(:dry_run,
-      "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
-      :type => :boolean,
-      :short => :n)
-  opt(:status_text,
-      "Store plain text status in given file.",
-      :short => :none,
-      :type => :string,
-      :default => '/dev/stdout')
-  opt(:status_json,
-      "Store json-formatted pipeline in given file.",
-      :short => :none,
-      :type => :string,
-      :default => '/dev/null')
-  opt(:no_wait,
-      "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
-      :short => :none,
-      :type => :boolean)
-  opt(:no_reuse,
-      "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
-      :short => :none,
-      :type => :boolean)
-  opt(:debug,
-      "Print extra debugging information on stderr.",
-      :type => :boolean)
-  opt(:debug_level,
-      "Set debug verbosity level.",
-      :short => :none,
-      :type => :integer)
-  opt(:template,
-      "UUID of pipeline template, or path to local pipeline template file.",
-      :short => :none,
-      :type => :string)
-  opt(:instance,
-      "UUID of pipeline instance.",
-      :short => :none,
-      :type => :string)
-  opt(:submit,
-      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
-      :short => :none,
-      :type => :boolean)
-  opt(:run_pipeline_here,
-      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
-      :short => :none,
-      :type => :boolean)
-  opt(:run_jobs_here,
-      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
-      :short => :none,
-      :type => :boolean)
-  opt(:run_here,
-      "Synonym for --run-jobs-here.",
-      :short => :none,
-      :type => :boolean)
-  opt(:description,
-      "Description for the pipeline instance.",
-      :short => :none,
-      :type => :string)
-  opt(:project_uuid,
-      "UUID of the project for the pipeline instance.",
-      short: :none,
-      type: :string)
-  stop_on [:'--']
-$options = Optimist::with_standard_exception_handling p do
-  p.parse ARGV
-$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
-$options[:run_jobs_here] ||= $options[:run_here] # old flag name
-$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
-if $options[:instance]
-  if $options[:template] or $options[:submit]
-    abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
-  end
-elsif not $options[:template]
-  $stderr.puts "error: you must supply a --template or --instance."
-  p.educate
-  abort
-if $options[:run_pipeline_here] == $options[:submit]
-  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
-# Set up the API client.
-$arv = Arvados.new api_version: 'v1'
-$client = $arv.client
-$arvados = $arv.arvados_api
-class PipelineInstance
-  def self.find(uuid)
-    result = $client.execute(:api_method => $arvados.pipeline_instances.get,
-                             :parameters => {
-                               :uuid => uuid
-                             },
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    unless j.is_a? Hash and j[:uuid]
-      debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
-      nil
-    else
-      debuglog "Retrieved pipeline_instance #{j[:uuid]}"
-      self.new(j)
-    end
-  end
-  def self.create(attributes)
-    result = $client.execute(:api_method => $arvados.pipeline_instances.create,
-                             :body_object => {
-                               :pipeline_instance => attributes
-                             },
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    unless j.is_a? Hash and j[:uuid]
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
-    end
-    debuglog "Created pipeline instance: #{j[:uuid]}"
-    self.new(j)
-  end
-  def save
-    result = $client.execute(:api_method => $arvados.pipeline_instances.update,
-                             :parameters => {
-                               :uuid => @pi[:uuid]
-                             },
-                             :body_object => {
-                               :pipeline_instance => @attributes_to_update
-                             },
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    unless j.is_a? Hash and j[:uuid]
-      debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
-      nil
-    else
-      @attributes_to_update = {}
-      @pi = j
-    end
-  end
-  def []=(x,y)
-    @attributes_to_update[x] = y
-    @pi[x] = y
-  end
-  def [](x)
-    @pi[x]
-  end
-  def log_stderr(msg)
-    $arv.log.create log: {
-      event_type: 'stderr',
-      object_uuid: self[:uuid],
-      owner_uuid: self[:owner_uuid],
-      properties: {"text" => msg},
-    }
-  end
-  protected
-  def initialize(j)
-    @attributes_to_update = {}
-    @pi = j
-  end
-class JobCache
-  def self.get(uuid)
-    @cache ||= {}
-    result = $client.execute(:api_method => $arvados.jobs.get,
-                             :parameters => {
-                               :uuid => uuid
-                             },
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    @cache[uuid] = JSON.parse result.body, :symbolize_names => true
-  end
-  def self.where(conditions)
-    result = $client.execute(:api_method => $arvados.jobs.list,
-                             :parameters => {
-                               :limit => 10000,
-                               :where => conditions.to_json
-                             },
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    list = JSON.parse result.body, :symbolize_names => true
-    if list and list[:items].is_a? Array
-      list[:items]
-    else
-      []
-    end
-  end
-  # create() returns [job, exception]. If both job and exception are
-  # nil, there was a non-retryable error and the call should not be
-  # attempted again.
-  def self.create(pipeline, component, job, create_params)
-    @cache ||= {}
-    body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
-    result = nil
-    begin
-      result = $client.execute(
-        :api_method => $arvados.jobs.create,
-        :body_object => body,
-        :authenticated => false,
-        :headers => {
-          authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-        })
-      if result.status == 429 || result.status >= 500
-        raise Exception.new("HTTP status #{result.status}")
-      end
-    rescue Exception => e
-      return nil, e
-    end
-    j = JSON.parse(result.body, :symbolize_names => true) rescue nil
-    if result.status == 200 && j.is_a?(Hash) && j[:uuid]
-      @cache[j[:uuid]] = j
-      return j, nil
-    else
-      errors = j[:errors] rescue []
-      debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
-      msg = ""
-      errors.each do |err|
-        msg += "Error creating job for component #{component}: #{err}\n"
-      end
-      msg += "Job submission was: #{body.to_json}"
-      pipeline.log_stderr(msg)
-      return nil, nil
-    end
-  end
-  protected
-  def self.no_nil_values(hash)
-    hash.reject { |key, value| value.nil? }
-  end
-class WhRunPipelineInstance
-  attr_reader :instance
-  def initialize(_options)
-    @options = _options
-  end
-  def fetch_template(template)
-    if template.match /[^-0-9a-z]/
-      # Doesn't look like a uuid -- use it as a filename.
-      @template = JSON.parse File.read(template), :symbolize_names => true
-    else
-      result = $client.execute(:api_method => $arvados.pipeline_templates.get,
-                               :parameters => {
-                                 :uuid => template
-                               },
-                               :authenticated => false,
-                               :headers => {
-                                 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                               })
-      @template = JSON.parse result.body, :symbolize_names => true
-      if !@template[:uuid]
-        abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
-      end
-    end
-    self
-  end
-  def fetch_instance(instance_uuid)
-    @instance = PipelineInstance.find(instance_uuid)
-    @template = @instance
-    self
-  end
-  def apply_parameters(params_args)
-    params_args.shift if params_args[0] == '--'
-    params = {}
-    while !params_args.empty?
-      if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
-        params[re[2]] = re[3]
-        params_args.shift
-      elsif params_args.size > 1
-        param = params_args.shift.sub /^--/, ''
-        params[param] = params_args.shift
-      else
-        abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
-      end
-    end
-    if not @template[:components].is_a?(Hash)
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
-    end
-    @components = @template[:components].dup
-    bad_components = @components.each_pair.select do |cname, cspec|
-      not cspec.is_a?(Hash)
-    end
-    if bad_components.any?
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
-    end
-    bad_components = @components.each_pair.select do |cname, cspec|
-      not cspec[:script_parameters].is_a?(Hash)
-    end
-    if bad_components.any?
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
-    end
-    errors = []
-    @components.each do |componentname, component|
-      component[:script_parameters].each do |parametername, parameter|
-        parameter = { :value => parameter } unless parameter.is_a? Hash
-        if params.has_key?("#{componentname}::#{parametername}")
-          value = params["#{componentname}::#{parametername}"]
-        elsif parameter.has_key?(:value)
-          value = parameter[:value]
-        elsif parameter.has_key?(:output_of)
-          if !@components[parameter[:output_of].intern]
-            errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
-          else
-            # value will be filled in later when the upstream
-            # component's output becomes known
-          end
-          next
-        elsif params.has_key?(parametername.to_s)
-          value = params[parametername.to_s]
-        elsif parameter.has_key?(:default)
-          value = parameter[:default]
-        elsif [false, 'false', 0, '0'].index(parameter[:required])
-          value = nil
-        else
-          errors << [componentname, parametername, "required parameter is missing"]
-          next
-        end
-        debuglog "parameter #{componentname}::#{parametername} == #{value}"
-        component[:script_parameters][parametername] =
-          parameter.dup.merge(value: value)
-      end
-    end
-    if !errors.empty?
-      all_errors = errors.collect do |c,p,e|
-        "#{c}::#{p} - #{e}\n"
-      end.join("")
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
-    end
-    debuglog "options=" + @options.pretty_inspect
-    self
-  end
-  def setup_instance
-    if @instance
-      @instance[:properties][:run_options] ||= {}
-      if @options[:no_reuse]
-        # override properties of existing instance
-        @instance[:properties][:run_options][:enable_job_reuse] = false
-      else
-        # Default to "enable reuse" if not specified. (This code path
-        # can go away when old clients go away.)
-        if @instance[:properties][:run_options][:enable_job_reuse].nil?
-          @instance[:properties][:run_options][:enable_job_reuse] = true
-        end
-      end
-    else
-      description = $options[:description] ||
-                    ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
-      instance_body = {
-        components: @components,
-        properties: {
-          run_options: {
-            enable_job_reuse: !@options[:no_reuse]
-          }
-        },
-        pipeline_template_uuid: @template[:uuid],
-        description: description,
-        state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
-      }
-      if @options[:project_uuid]
-        instance_body[:owner_uuid] = @options[:project_uuid]
-      end
-      @instance = PipelineInstance.create(instance_body)
-    end
-    self
-  end
-  def run
-    moretodo = true
-    interrupted = false
-    if @instance[:started_at].nil?
-      @instance[:started_at] = Time.now
-    end
-    job_creation_failed = 0
-    while moretodo
-      moretodo = false
-      @components.each do |cname, c|
-        job = nil
-        owner_uuid = @instance[:owner_uuid]
-        # Is the job satisfying this component already known to be
-        # finished? (Already meaning "before we query API server about
-        # the job's current state")
-        c_already_finished = (c[:job] &&
-                              c[:job][:uuid] &&
-                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
-        if !c[:job] and
-            c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
-          # No job yet associated with this component and is component inputs
-          # are fully specified (any output_of script_parameters are resolved
-          # to real value)
-          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
-          job, err = JobCache.create(@instance, cname, {
-            :script => c[:script],
-            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
-                                         [key, spec[:value]]
-                                       end],
-            :script_version => c[:script_version],
-            :repository => c[:repository],
-            :nondeterministic => c[:nondeterministic],
-            :runtime_constraints => c[:runtime_constraints],
-            :owner_uuid => owner_uuid,
-            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
-            :submit_id => my_submit_id,
-            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
-          }, {
-            # This is the right place to put these attributes when
-            # dealing with new API servers.
-            :minimum_script_version => c[:minimum_script_version],
-            :exclude_script_versions => c[:exclude_minimum_script_versions],
-            :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
-                                !c[:nondeterministic]),
-            :filters => c[:filters]
-          })
-          if job
-            debuglog "component #{cname} new job #{job[:uuid]}"
-            c[:job] = job
-            c[:run_in_process] = (@options[:run_jobs_here] and
-                                  job[:submit_id] == my_submit_id)
-          elsif err.nil?
-            debuglog "component #{cname} new job failed", 0
-            job_creation_failed += 1
-          else
-            debuglog "component #{cname} new job failed, err=#{err}", 0
-          end
-        end
-        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
-          report_status
-          begin
-            require 'open3'
-            Open3.popen3("arv-crunch-job", "--force-unlock",
-                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
-              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
-              stdin.close
-              while true
-                rready, wready, = IO.select([stdout, stderr], [])
-                break if !rready[0]
-                begin
-                  buf = rready[0].read_nonblock(2**20)
-                rescue EOFError
-                  break
-                end
-                (rready[0] == stdout ? $stdout : $stderr).write(buf)
-              end
-              stdout.close
-              stderr.close
-              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
-            end
-            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
-              raise Exception.new("arv-crunch-job did not set finished_at.")
-            end
-          rescue Exception => e
-            debuglog "Interrupted (#{e}). Failing job.", 0
-            $arv.job.update(uuid: c[:job][:uuid],
-                            job: {
-                              state: "Failed"
-                            })
-          end
-        end
-        if c[:job] and c[:job][:uuid]
-          if ["Running", "Queued"].include?(c[:job][:state])
-            # Job is running (or may be soon) so update copy of job record
-            c[:job] = JobCache.get(c[:job][:uuid])
-          end
-          if c[:job][:state] == "Complete"
-            # Populate script_parameters of other components waiting for
-            # this job
-            @components.each do |c2name, c2|
-              c2[:script_parameters].each do |pname, p|
-                if p.is_a? Hash and p[:output_of] == cname.to_s
-                  debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
-                  c2[:script_parameters][pname] = {value: c[:job][:output]}
-                  moretodo = true
-                end
-              end
-            end
-            unless c_already_finished
-              # This is my first time discovering that the job
-              # succeeded. (At the top of this loop, I was still
-              # waiting for it to finish.)
-              if @instance[:name].andand.length.andand > 0
-                pipeline_name = @instance[:name]
-              elsif @template.andand[:name].andand.length.andand > 0
-                pipeline_name = @template[:name]
-              else
-                pipeline_name = @instance[:uuid]
-              end
-              if c[:output_name] != false
-                # Create a collection located in the same project as the pipeline with the contents of the output.
-                portable_data_hash = c[:job][:output]
-                collections = $arv.collection.list(limit: 1,
-                                                   filters: [['portable_data_hash', '=', portable_data_hash]],
-                                                   select: ["portable_data_hash", "manifest_text"]
-                                                   )[:items]
-                if collections.any?
-                  name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
-                  # check if there is a name collision.
-                  name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
-                                                                   ["name", "=", name]])[:items]
-                  newcollection_actual = nil
-                  if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
-                    # There is already a collection with the same name and the
-                    # same contents, so just point to that.
-                    newcollection_actual = name_collisions.first
-                  end
-                  if newcollection_actual.nil?
-                    # Did not find a collection with the same name (or the
-                    # collection has a different portable data hash) so create
-                    # a new collection with ensure_unique_name: true.
-                    newcollection = {
-                      owner_uuid: owner_uuid,
-                      name: name,
-                      portable_data_hash: collections.first[:portable_data_hash],
-                      manifest_text: collections.first[:manifest_text]
-                    }
-                    debuglog "Creating collection #{newcollection}", 0
-                    newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
-                  end
-                  c[:output_uuid] = newcollection_actual[:uuid]
-                else
-                  debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
-                end
-              end
-            end
-          elsif ["Queued", "Running"].include? c[:job][:state]
-            # Job is running or queued to run, so indicate that pipeline
-            # should continue to run
-            moretodo = true
-          elsif c[:job][:state] == "Cancelled"
-            debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
-            moretodo = false
-          elsif c[:job][:state] == "Failed"
-            moretodo = false
-          end
-        end
-      end
-      @instance[:components] = @components
-      report_status
-      if @options[:no_wait]
-        moretodo = false
-      end
-      # If job creation fails, just give up on this pipeline instance.
-      if job_creation_failed > 0
-        moretodo = false
-      end
-      if moretodo
-        begin
-          sleep 10
-        rescue Interrupt
-          debuglog "interrupt", 0
-          interrupted = true
-          break
-        end
-      end
-    end
-    c_in_state = @components.values.group_by { |c|
-      c[:job] and c[:job][:state]
-    }
-    succeeded = c_in_state["Complete"].andand.count || 0
-    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
-    ended = succeeded + failed
-    success = (succeeded == @components.length)
-    # A job create call failed. Just give up.
-    if job_creation_failed > 0
-      debuglog "job creation failed - giving up on this pipeline instance", 0
-      success = false
-      failed += 1
-    end
-    if interrupted
-     if success
-        @instance[:state] = 'Complete'
-     else
-        @instance[:state] = 'Paused'
-     end
-    else
-      if ended == @components.length or failed > 0
-        @instance[:state] = success ? 'Complete' : 'Failed'
-      end
-    end
-    if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
-      @instance[:finished_at] = Time.now
-    end
-    debuglog "pipeline instance state is #{@instance[:state]}"
-    # set components_summary
-    components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
-    @instance[:components_summary] = components_summary
-    @instance.save
-  end
-  def cleanup
-    if @instance and @instance[:state] == 'RunningOnClient'
-      @instance[:state] = 'Paused'
-      @instance.save
-    end
-  end
-  def uuid
-    @instance[:uuid]
-  end
-  protected
-  def report_status
-    @instance.save
-    if @options[:status_json] != '/dev/null'
-      File.open(@options[:status_json], 'w') do |f|
-        f.puts @components.pretty_inspect
-      end
-    end
-    if @options[:status_text] != '/dev/null'
-      File.open(@options[:status_text], 'w') do |f|
-        f.puts ""
-        f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
-        namewidth = @components.collect { |cname, c| cname.size }.max
-        @components.each do |cname, c|
-          jstatus = if !c[:job]
-                      "-"
-                    else case c[:job][:state]
-                         when "Running"
-                           "#{c[:job][:tasks_summary].inspect}"
-                         when "Complete"
-                           c[:job][:output]
-                         when "Cancelled"
-                           "cancelled #{c[:job][:cancelled_at]}"
-                         when "Failed"
-                           "failed #{c[:job][:finished_at]}"
-                         when "Queued"
-                           "queued #{c[:job][:created_at]}"
-                         end
-                    end
-          f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
-        end
-      end
-    end
-  end
-  def abort(msg)
-    if @instance
-      if ["New", "Ready", "RunningOnClient",
-          "RunningOnServer"].include?(@instance[:state])
-        @instance[:state] = "Failed"
-        @instance[:finished_at] = Time.now
-        @instance.save
-      end
-      @instance.log_stderr(msg)
-    end
-    Kernel::abort(msg)
-  end
-runner = WhRunPipelineInstance.new($options)
-  if $options[:template]
-    runner.fetch_template($options[:template])
-  else
-    runner.fetch_instance($options[:instance])
-  end
-  runner.apply_parameters(p.leftovers)
-  runner.setup_instance
-  if $options[:submit]
-    runner.instance.save
-    puts runner.instance[:uuid]
-  else
-    runner.run
-  end
-rescue Exception => e
-  runner.cleanup
-  raise e
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
deleted file mode 100755 (executable)
index 242dff7..0000000
+++ /dev/null
@@ -1,2577 +0,0 @@
-#!/usr/bin/env perl
-# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
-# Copyright (C) The Arvados Authors. All rights reserved.
-# SPDX-License-Identifier: AGPL-3.0
-=head1 NAME
-crunch-job: Execute job steps, save snapshots as requested, collate output.
-=head1 SYNOPSIS
-Obtain job details from Arvados, run tasks on compute nodes (typically
-invoked by scheduler on controller):
- crunch-job --job x-y-z --git-dir /path/to/repo/.git
-Obtain job details from command line, run tasks on local machine
-(typically invoked by application or developer on VM):
- crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
- crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
-=head1 OPTIONS
-=item --force-unlock
-If the job is already locked, steal the lock and run it anyway.
-=item --git-dir
-Path to a .git directory (or a git URL) where the commit given in the
-job's C<script_version> attribute is to be found. If this is I<not>
-given, the job's C<repository> attribute will be used.
-=item --job-api-token
-Arvados API authorization token to use during the course of the job.
-=item --no-clear-tmp
-Do not clear per-job/task temporary directories during initial job
-setup. This can speed up development and debugging when running jobs
-=item --job
-UUID of the job to run, or a JSON-encoded job resource without a
-UUID. If the latter is given, a new job object will be created.
-crunch-job's log messages appear on stderr along with the job tasks'
-stderr streams. The log is saved in Keep at each checkpoint and when
-the job finishes.
-If the job succeeds, the job's output locator is printed on stdout.
-While the job is running, the following signals are accepted:
-=item control-C, SIGINT, SIGQUIT
-Save a checkpoint, terminate any job tasks that are running, and stop.
-=item SIGALRM
-Save a checkpoint and continue.
-=item SIGHUP
-Refresh node allocation (i.e., check whether any nodes have been added
-or unallocated) and attributes of the Job record that should affect
-behavior (e.g., cancel job if cancelled_at becomes non-nil).
-use strict;
-use POSIX ':sys_wait_h';
-use POSIX qw(strftime);
-use Arvados;
-use Cwd qw(realpath);
-use Data::Dumper;
-use Digest::MD5 qw(md5_hex);
-use Getopt::Long;
-use IPC::Open2;
-use IO::Select;
-use File::Temp;
-use Fcntl ':flock';
-use File::Path qw( make_path remove_tree );
-use constant TASK_TEMPFAIL => 111;
-use constant EX_TEMPFAIL => 75;
-use constant EX_RETRY_UNLOCKED => 93;
-$ENV{"TMPDIR"} ||= "/tmp";
-unless (defined $ENV{"CRUNCH_TMP"}) {
-  $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
-  if ($ENV{"USER"} ne "crunch" && $< != 0) {
-    # use a tmp dir unique for my uid
-    $ENV{"CRUNCH_TMP"} .= "-$<";
-  }
-# Create the tmp directory if it does not exist
-if ( ! -d $ENV{"CRUNCH_TMP"} ) {
-  make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
-$ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
-$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
-mkdir ($ENV{"JOB_WORK"});
-my %proc;
-my $force_unlock;
-my $git_dir;
-my $jobspec;
-my $job_api_token;
-my $no_clear_tmp;
-my $resume_stash;
-my $cgroup_root = "/sys/fs/cgroup";
-my $docker_bin = "docker.io";
-my $docker_run_args = "";
-my $srun_sync_timeout = 15*60;
-GetOptions('force-unlock' => \$force_unlock,
-           'git-dir=s' => \$git_dir,
-           'job=s' => \$jobspec,
-           'job-api-token=s' => \$job_api_token,
-           'no-clear-tmp' => \$no_clear_tmp,
-           'resume-stash=s' => \$resume_stash,
-           'cgroup-root=s' => \$cgroup_root,
-           'docker-bin=s' => \$docker_bin,
-           'docker-run-args=s' => \$docker_run_args,
-           'srun-sync-timeout=i' => \$srun_sync_timeout,
-    );
-if (defined $job_api_token) {
-  $ENV{ARVADOS_API_TOKEN} = $job_api_token;
-my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
-$SIG{'USR1'} = sub
-  $main::ENV{CRUNCH_DEBUG} = 1;
-$SIG{'USR2'} = sub
-  $main::ENV{CRUNCH_DEBUG} = 0;
-my $arv = Arvados->new('apiVersion' => 'v1');
-my $Job;
-my $job_id;
-my $dbh;
-my $sth;
-my @jobstep;
-my $local_job;
-if ($jobspec =~ /^[-a-z\d]+$/)
-  # $jobspec is an Arvados UUID, not a JSON job specification
-  $Job = api_call("jobs/get", uuid => $jobspec);
-  $local_job = 0;
-  $local_job = JSON::decode_json($jobspec);
-# Make sure our workers (our slurm nodes, localhost, or whatever) are
-# at least able to run basic commands: they aren't down or severely
-# misconfigured.
-my $cmd = ['true'];
-if (($Job || $local_job)->{docker_image_locator}) {
-  $cmd = [$docker_bin, 'ps', '-q'];
-Log(undef, "Sanity check is `@$cmd`");
-my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-  ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
-  $cmd,
-  {label => "sanity check"});
-if ($exited != 0) {
-  Log(undef, "Sanity check failed: ".exit_status_s($exited));
-  exit EX_TEMPFAIL;
-Log(undef, "Sanity check OK");
-my $User = api_call("users/current");
-if (!$local_job) {
-  if (!$force_unlock) {
-    # Claim this job, and make sure nobody else does
-    eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
-    if ($@) {
-      Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
-      exit EX_TEMPFAIL;
-    };
-  }
-  if (!$resume_stash)
-  {
-    map { croak ("No $_ specified") unless $local_job->{$_} }
-    qw(script script_version script_parameters);
-  }
-  $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
-  $local_job->{'started_at'} = gmtime;
-  $local_job->{'state'} = 'Running';
-  $Job = api_call("jobs/create", job => $local_job);
-$job_id = $Job->{'uuid'};
-my $keep_logfile = $job_id . '.log.txt';
-$Job->{'runtime_constraints'} ||= {};
-$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
-my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
-my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
-if ($? == 0) {
-  $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
-  chomp($gem_versions);
-  chop($gem_versions);  # Closing parentheses
-} else {
-  $gem_versions = "";
-    "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
-Log (undef, "check slurm allocation");
-my @slot;
-my @node;
-# Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
-my @sinfo;
-if (!$have_slurm)
-  my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
-  push @sinfo, "$localcpus localhost";
-if (exists $ENV{SLURM_NODELIST})
-  push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
-foreach (@sinfo)
-  my ($ncpus, $slurm_nodelist) = split;
-  $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
-  my @nodelist;
-  while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
-  {
-    my $nodelist = $1;
-    if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
-    {
-      my $ranges = $1;
-      foreach (split (",", $ranges))
-      {
-       my ($a, $b);
-       if (/(\d+)-(\d+)/)
-       {
-         $a = $1;
-         $b = $2;
-       }
-       else
-       {
-         $a = $_;
-         $b = $_;
-       }
-       push @nodelist, map {
-         my $n = $nodelist;
-         $n =~ s/\[[-,\d]+\]/$_/;
-         $n;
-       } ($a..$b);
-      }
-    }
-    else
-    {
-      push @nodelist, $nodelist;
-    }
-  }
-  foreach my $nodename (@nodelist)
-  {
-    Log (undef, "node $nodename - $ncpus slots");
-    my $node = { name => $nodename,
-                 ncpus => $ncpus,
-                 # The number of consecutive times a task has been dispatched
-                 # to this node and failed.
-                 losing_streak => 0,
-                 # The number of consecutive times that SLURM has reported
-                 # a node failure since the last successful task.
-                 fail_count => 0,
-                 # Don't dispatch work to this node until this time
-                 # (in seconds since the epoch) has passed.
-                 hold_until => 0 };
-    foreach my $cpu (1..$ncpus)
-    {
-      push @slot, { node => $node,
-                   cpu => $cpu };
-    }
-  }
-  push @node, @nodelist;
-# Ensure that we get one jobstep running on each allocated node before
-# we start overloading nodes with concurrent steps
-@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
-  'tasks_summary' => { 'failed' => 0,
-                       'todo' => 1,
-                       'running' => 0,
-                       'done' => 0 });
-Log (undef, "start");
-$SIG{'INT'} = sub { $main::please_freeze = 1; };
-$SIG{'QUIT'} = sub { $main::please_freeze = 1; };
-$SIG{'TERM'} = \&croak;
-$SIG{'TSTP'} = sub { $main::please_freeze = 1; };
-$SIG{'ALRM'} = sub { $main::please_info = 1; };
-$SIG{'CONT'} = sub { $main::please_continue = 1; };
-$SIG{'HUP'} = sub { $main::please_refresh = 1; };
-$main::please_freeze = 0;
-$main::please_info = 0;
-$main::please_continue = 0;
-$main::please_refresh = 0;
-my $jobsteps_must_output_keys = 0;     # becomes 1 when any task outputs a key
-grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
-$ENV{"CRUNCH_JOB_UUID"} = $job_id;
-$ENV{"JOB_UUID"} = $job_id;
-my @jobstep_todo = ();
-my @jobstep_done = ();
-my @jobstep_tomerge = ();
-my $jobstep_tomerge_level = 0;
-my $squeue_checked = 0;
-my $sinfo_checked = 0;
-my $latest_refresh = scalar time;
-if (defined $Job->{thawedfromkey})
-  thaw ($Job->{thawedfromkey});
-  my $first_task = api_call("job_tasks/create", job_task => {
-    'job_uuid' => $Job->{'uuid'},
-    'sequence' => 0,
-    'qsequence' => 0,
-    'parameters' => {},
-  });
-  push @jobstep, { 'level' => 0,
-                  'failures' => 0,
-                   'arvados_task' => $first_task,
-                };
-  push @jobstep_todo, 0;
-if (!$have_slurm)
-  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-my $build_script = handle_readall(\*DATA);
-my $nodelist = join(",", @node);
-my $git_tar_count = 0;
-if (!defined $no_clear_tmp) {
-  # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
-  # up work directories crunch_tmp/work, crunch_tmp/opt,
-  # crunch_tmp/src*.
-  my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-    ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-    ['bash', '-ec', q{
-arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
-rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
-    }],
-    {label => "clean work dirs"});
-  if ($exited != 0) {
-    exit_retry_unlocked();
-  }
-# If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
-if ($docker_locator = $Job->{docker_image_locator}) {
-  Log (undef, "Install docker image $docker_locator");
-  ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
-  if (!$docker_hash)
-  {
-    croak("No Docker image hash found from locator $docker_locator");
-  }
-  Log (undef, "docker image hash is $docker_hash");
-  $docker_stream =~ s/^\.//;
-  my $docker_install_script = qq{
-loaded() {
-  id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
-  echo "image ID is \$id"
-  [[ \${id} = \Q$docker_hash\E ]]
-if loaded >&2 2>/dev/null; then
-  echo >&2 "image is already present"
-  exit 0
-echo >&2 "docker image is not present; loading"
-arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-if ! loaded >&2; then
-  echo >&2 "`docker load` exited 0, but image is not found (!)"
-  exit 1
-echo >&2 "image loaded successfully"
-  my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-    ["srun", "--nodelist=" . join(',', @node)],
-    ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
-    {label => "load docker image"});
-  if ($exited != 0)
-  {
-    exit_retry_unlocked();
-  }
-  # Determine whether this version of Docker supports memory+swap limits.
-  ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-    ["srun", "--nodes=1"],
-    [$docker_bin, 'run', '--help'],
-    {label => "check --memory-swap feature"});
-  if ($tempfail) {
-    exit_retry_unlocked();
-  }
-  $docker_limitmem = ($stdout =~ /--memory-swap/);
-  # Find a non-root Docker user to use.
-  # Tries the default user for the container, then 'crunch', then 'nobody',
-  # testing for whether the actual user id is non-zero.  This defends against
-  # mistakes but not malice, but we intend to harden the security in the future
-  # so we don't want anyone getting used to their jobs running as root in their
-  # Docker containers.
-  my @tryusers = ("", "crunch", "nobody");
-  foreach my $try_user (@tryusers) {
-    my $label;
-    my $try_user_arg;
-    if ($try_user eq "") {
-      $label = "check whether default user is UID 0";
-      $try_user_arg = "";
-    } else {
-      $label = "check whether user '$try_user' is UID 0";
-      $try_user_arg = "--user=$try_user";
-    }
-    my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-      ["srun", "--nodes=1"],
-      ["/bin/sh", "-ec",
-       "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
-      {label => $label});
-    chomp($stdout);
-    if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
-      $dockeruserarg = $try_user_arg;
-      if ($try_user eq "") {
-        Log(undef, "Container will run with default user");
-      } else {
-        Log(undef, "Container will run with $dockeruserarg");
-      }
-      last;
-    } elsif ($tempfail) {
-      exit_retry_unlocked();
-    }
-  }
-  if (!defined $dockeruserarg) {
-    croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
-  }
-  if ($Job->{arvados_sdk_version}) {
-    # The job also specifies an Arvados SDK version.  Add the SDKs to the
-    # tar file for the build script to install.
-    Log(undef, sprintf("Packing Arvados SDK version %s for installation",
-                       $Job->{arvados_sdk_version}));
-    add_git_archive("git", "--git-dir=$git_dir", "archive",
-                    "--prefix=.arvados.sdk/",
-                    $Job->{arvados_sdk_version}, "sdk");
-  }
-if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
-  # If script_version looks like an absolute path, *and* the --git-dir
-  # argument was not given -- which implies we were not invoked by
-  # crunch-dispatch -- we will use the given path as a working
-  # directory instead of resolving script_version to a git commit (or
-  # doing anything else with git).
-  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
-  $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
-else {
-  # Resolve the given script_version to a git commit sha1. Also, if
-  # the repository is remote, clone it into our local filesystem: this
-  # ensures "git archive" will work, and is necessary to reliably
-  # resolve a symbolic script_version like "master^".
-  Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
-  $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
-  # If we're running under crunch-dispatch, it will have already
-  # pulled the appropriate source tree into its own repository, and
-  # given us that repo's path as $git_dir.
-  #
-  # If we're running a "local" job, we might have to fetch content
-  # from a remote repository.
-  #
-  # (Currently crunch-dispatch gives a local path with --git-dir, but
-  # we might as well accept URLs there too in case it changes its
-  # mind.)
-  my $repo = $git_dir || $Job->{'repository'};
-  # Repository can be remote or local. If remote, we'll need to fetch it
-  # to a local dir before doing `git log` et al.
-  my $repo_location;
-  if ($repo =~ m{://|^[^/]*:}) {
-    # $repo is a git url we can clone, like git:// or https:// or
-    # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
-    # not recognized here because distinguishing that from a local
-    # path is too fragile. If you really need something strange here,
-    # use the ssh:// form.
-    $repo_location = 'remote';
-  } elsif ($repo =~ m{^\.*/}) {
-    # $repo is a local path to a git index. We'll also resolve ../foo
-    # to ../foo/.git if the latter is a directory. To help
-    # disambiguate local paths from named hosted repositories, this
-    # form must be given as ./ or ../ if it's a relative path.
-    if (-d "$repo/.git") {
-      $repo = "$repo/.git";
-    }
-    $repo_location = 'local';
-  } else {
-    # $repo is none of the above. It must be the name of a hosted
-    # repository.
-    my $arv_repo_list = api_call("repositories/list",
-                                 'filters' => [['name','=',$repo]]);
-    my @repos_found = @{$arv_repo_list->{'items'}};
-    my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
-    if ($n_found > 0) {
-      Log(undef, "Repository '$repo' -> "
-          . join(", ", map { $_->{'uuid'} } @repos_found));
-    }
-    if ($n_found != 1) {
-      croak("Error: Found $n_found repositories with name '$repo'.");
-    }
-    $repo = $repos_found[0]->{'fetch_url'};
-    $repo_location = 'remote';
-  }
-  Log(undef, "Using $repo_location repository '$repo'");
-  $ENV{"CRUNCH_SRC_URL"} = $repo;
-  # Resolve given script_version (we'll call that $treeish here) to a
-  # commit sha1 ($commit).
-  my $treeish = $Job->{'script_version'};
-  my $commit;
-  if ($repo_location eq 'remote') {
-    # We minimize excess object-fetching by re-using the same bare
-    # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
-    # just keep adding remotes to it as needed.
-    my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
-    my $gitcmd = "git --git-dir=\Q$local_repo\E";
-    # Set up our local repo for caching remote objects, making
-    # archives, etc.
-    if (!-d $local_repo) {
-      make_path($local_repo) or croak("Error: could not create $local_repo");
-    }
-    # This works (exits 0 and doesn't delete fetched objects) even
-    # if $local_repo is already initialized:
-    `$gitcmd init --bare`;
-    if ($?) {
-      croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
-    }
-    # If $treeish looks like a hash (or abbrev hash) we look it up in
-    # our local cache first, since that's cheaper. (We don't want to
-    # do that with tags/branches though -- those change over time, so
-    # they should always be resolved by the remote repo.)
-    if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
-      # Hide stderr because it's normal for this to fail:
-      my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
-      if ($? == 0 &&
-          # Careful not to resolve a branch named abcdeff to commit 1234567:
-          $sha1 =~ /^$treeish/ &&
-          $sha1 =~ /^([0-9a-f]{40})$/s) {
-        $commit = $1;
-        Log(undef, "Commit $commit already present in $local_repo");
-      }
-    }
-    if (!defined $commit) {
-      # If $treeish isn't just a hash or abbrev hash, or isn't here
-      # yet, we need to fetch the remote to resolve it correctly.
-      # First, remove all local heads. This prevents a name that does
-      # not exist on the remote from resolving to (or colliding with)
-      # a previously fetched branch or tag (possibly from a different
-      # remote).
-      remove_tree("$local_repo/refs/heads", {keep_root => 1});
-      Log(undef, "Fetching objects from $repo to $local_repo");
-      `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
-      if ($?) {
-        croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
-      }
-    }
-    # Now that the data is all here, we will use our local repo for
-    # the rest of our git activities.
-    $repo = $local_repo;
-  }
-  my $gitcmd = "git --git-dir=\Q$repo\E";
-  my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
-  unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
-    croak("`$gitcmd rev-list` exited "
-          .exit_status_s($?)
-          .", '$treeish' not found, giving up");
-  }
-  $commit = $1;
-  Log(undef, "Version $treeish is commit $commit");
-  if ($commit ne $Job->{'script_version'}) {
-    # Record the real commit id in the database, frozentokey, logs,
-    # etc. -- instead of an abbreviation or a branch name which can
-    # become ambiguous or point to a different commit in the future.
-    if (!$Job->update_attributes('script_version' => $commit)) {
-      croak("Error: failed to update job's script_version attribute");
-    }
-  }
-  $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-  add_git_archive("$gitcmd archive ''\Q$commit\E");
-my $git_archive = combined_git_archive();
-if (!defined $git_archive) {
-  Log(undef, "Skip install phase (no git archive)");
-  if ($have_slurm) {
-    Log(undef, "Warning: This probably means workers have no source tree!");
-  }
-else {
-  my $exited;
-  my $install_script_tries_left = 3;
-  for (my $attempts = 0; $attempts < 3; $attempts++) {
-    my @srunargs = ("srun",
-                    "--nodelist=$nodelist",
-                    "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-    my @execargs = ("sh", "-c",
-                    "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-    $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-    my ($stdout, $stderr, $tempfail);
-    ($exited, $stdout, $stderr, $tempfail) = srun_sync(
-      \@srunargs, \@execargs,
-      {label => "run install script on all workers"},
-        $build_script . $git_archive);
-    if ($tempfail) {
-      exit_retry_unlocked();
-    }
-    my $stderr_anything_from_script = 0;
-    for my $line (split(/\n/, $stderr)) {
-      if ($line !~ /^(srun: error: |starting: \[)/) {
-        $stderr_anything_from_script = 1;
-      }
-    }
-    last if $exited == 0 || $main::please_freeze;
-    # If the install script fails but doesn't print an error message,
-    # the next thing anyone is likely to do is just run it again in
-    # case it was a transient problem like "slurm communication fails
-    # because the network isn't reliable enough". So we'll just do
-    # that ourselves (up to 3 attempts in total). OTOH, if there is an
-    # error message, the problem is more likely to have a real fix and
-    # we should fail the job so the fixing process can start, instead
-    # of doing 2 more attempts.
-    last if $stderr_anything_from_script;
-  }
-  foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
-    unlink($tar_filename);
-  }
-  if ($exited != 0) {
-    croak("Giving up");
-  }
-foreach (qw (script script_version script_parameters runtime_constraints))
-  Log (undef,
-       "$_ " .
-       (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
-foreach (split (/\n/, $Job->{knobs}))
-  Log (undef, "knob " . $_);
-my $resp = api_call(
-  'nodes/list',
-  'filters' => [['hostname', 'in', \@node]],
-  'order' => 'hostname',
-  'limit' => scalar(@node),
-    );
-for my $n (@{$resp->{items}}) {
-  Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
-$main::success = undef;
-my $thisround_succeeded = 0;
-my $thisround_failed = 0;
-my $thisround_failed_multiple = 0;
-my $working_slot_count = scalar(@slot);
-@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
-                      or $a <=> $b } @jobstep_todo;
-my $level = $jobstep[$jobstep_todo[0]]->{level};
-my $initial_tasks_this_level = 0;
-foreach my $id (@jobstep_todo) {
-  $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
-# If the number of tasks scheduled at this level #T is smaller than the number
-# of slots available #S, only use the first #T slots, or the first slot on
-# each node, whichever number is greater.
-# When we dispatch tasks later, we'll allocate whole-node resources like RAM
-# based on these numbers.  Using fewer slots makes more resources available
-# to each individual task, which should normally be a better strategy when
-# there are fewer of them running with less parallelism.
-# Note that this calculation is not redone if the initial tasks at
-# this level queue more tasks at the same level.  This may harm
-# overall task throughput for that level.
-my @freeslot;
-if ($initial_tasks_this_level < @node) {
-  @freeslot = (0..$#node);
-} elsif ($initial_tasks_this_level < @slot) {
-  @freeslot = (0..$initial_tasks_this_level - 1);
-} else {
-  @freeslot = (0..$#slot);
-my $round_num_freeslots = scalar(@freeslot);
-print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
-my %round_max_slots = ();
-for (my $ii = $#freeslot; $ii >= 0; $ii--) {
-  my $this_slot = $slot[$freeslot[$ii]];
-  my $node_name = $this_slot->{node}->{name};
-  $round_max_slots{$node_name} ||= $this_slot->{cpu};
-  last if (scalar(keys(%round_max_slots)) >= @node);
-Log(undef, "start level $level with $round_num_freeslots slots");
-my @holdslot;
-my %reader;
-my $progress_is_dirty = 1;
-my $progress_stats_updated = 0;
-for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
-  # Don't create new tasks if we already know the job's final result.
-  last if defined($main::success);
-  my $id = $jobstep_todo[$todo_ptr];
-  my $Jobstep = $jobstep[$id];
-  if ($Jobstep->{level} != $level)
-  {
-    next;
-  }
-  pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
-  set_nonblocking($reader{$id});
-  my $childslot = $freeslot[0];
-  my $childnode = $slot[$childslot]->{node};
-  my $childslotname = join (".",
-                           $slot[$childslot]->{node}->{name},
-                           $slot[$childslot]->{cpu});
-  my $childpid = fork();
-  if ($childpid == 0)
-  {
-    $SIG{'INT'} = 'DEFAULT';
-    $SIG{'QUIT'} = 'DEFAULT';
-    $SIG{'TERM'} = 'DEFAULT';
-    foreach (values (%reader))
-    {
-      close($_);
-    }
-    fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
-    open(STDOUT,">&writer") or croak ($!);
-    open(STDERR,">&writer") or croak ($!);
-    undef $dbh;
-    undef $sth;
-    delete $ENV{"GNUPGHOME"};
-    $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
-    $ENV{"TASK_QSEQUENCE"} = $id;
-    $ENV{"TASK_SEQUENCE"} = $level;
-    $ENV{"JOB_SCRIPT"} = $Job->{script};
-    while (my ($param, $value) = each %{$Job->{script_parameters}}) {
-      $param =~ tr/a-z/A-Z/;
-      $ENV{"JOB_PARAMETER_$param"} = $value;
-    }
-    $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
-    $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
-    $ENV{"HOME"} = $ENV{"TASK_WORK"};
-    $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-    $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
-    $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
-    my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
-    $ENV{"GZIP"} = "-n";
-    my @srunargs = (
-      "srun",
-      "--nodelist=".$childnode->{name},
-      qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
-      "--job-name=$job_id.$id.$$",
-       );
-    my $stdbuf = " stdbuf --output=0 --error=0 ";
-    my $arv_file_cache = "";
-    if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
-      $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
-    }
-    my $command =
-       "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
-        ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
-       ."&& cd \Q$ENV{CRUNCH_TMP}\E "
-        # These environment variables get used explicitly later in
-        # $command.  No tool is expected to read these values directly.
-        .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
-        .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
-        ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
-        ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
-        .q{&& declare -a VOLUMES=() }
-        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
-        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
-        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
-    $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
-    $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
-    $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
-    if ($docker_hash)
-    {
-      my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
-      my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
-      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
-      # We only set memory limits if Docker lets us limit both memory and swap.
-      # Memory limits alone have been supported longer, but subprocesses tend
-      # to get SIGKILL if they exceed that without any swap limit set.
-      # See #5642 for additional background.
-      if ($docker_limitmem) {
-        $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
-      }
-      # The source tree and $destdir directory (which we have
-      # installed on the worker host) are available in the container,
-      # under the same path.
-      $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
-      $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
-      # Currently, we make the "by_pdh" directory in arv-mount's mount
-      # point appear at /keep inside the container (instead of using
-      # the same path as the host like we do with CRUNCH_SRC and
-      # CRUNCH_INSTALL). However, crunch scripts and utilities must
-      # not rely on this. They must use $TASK_KEEPMOUNT.
-      $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
-      $ENV{TASK_KEEPMOUNT} = "/keep";
-      # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
-      $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
-      $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
-      # TASK_WORK is almost exactly like a docker data volume: it
-      # starts out empty, is writable, and persists until no
-      # containers use it any more. We don't use --volumes-from to
-      # share it with other containers: it is only accessible to this
-      # task, and it goes away when this task stops.
-      #
-      # However, a docker data volume is writable only by root unless
-      # the mount point already happens to exist in the container with
-      # different permissions. Therefore, we [1] assume /tmp already
-      # exists in the image and is writable by the crunch user; [2]
-      # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
-      # writable if they are created by docker while setting up the
-      # other --volumes); and [3] create $TASK_WORK inside the
-      # container using $build_script.
-      $command .= "--volume=/tmp ";
-      $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
-      $ENV{"HOME"} = $ENV{"TASK_WORK"};
-      $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
-      # TODO: Share a single JOB_WORK volume across all task
-      # containers on a given worker node, and delete it when the job
-      # ends (and, in case that doesn't work, when the next job
-      # starts).
-      #
-      # For now, use the same approach as TASK_WORK above.
-      $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
-      # Bind mount the crunchrunner binary and host TLS certificates file into
-      # the container.
-      $command .= '"${VOLUMES[@]}" ';
-      while (my ($env_key, $env_val) = each %ENV)
-      {
-        if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
-          $command .= "--env=\Q$env_key=$env_val\E ";
-        }
-      }
-      $command .= "--env=\QHOME=$ENV{HOME}\E ";
-      $command .= "\Q$docker_hash\E ";
-      if ($Job->{arvados_sdk_version}) {
-        $command .= $stdbuf;
-        $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
-      } else {
-        $command .= "/bin/sh -c \'python -c " .
-            '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
-            ">&2 2>/dev/null; " .
-            "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
-            "if which stdbuf >/dev/null ; then " .
-            "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
-            " else " .
-            "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
-            " fi\'";
-      }
-    } else {
-      # Non-docker run
-      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
-      $command .= $stdbuf;
-      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
-    }
-    my @execargs = ('bash', '-c', $command);
-    srun (\@srunargs, \@execargs, undef, $build_script);
-    # exec() failed, we assume nothing happened.
-    die "srun() failed on build script\n";
-  }
-  close("writer");
-  if (!defined $childpid)
-  {
-    close $reader{$id};
-    delete $reader{$id};
-    next;
-  }
-  shift @freeslot;
-  $proc{$childpid} = {
-    jobstepidx => $id,
-    time => time,
-    slot => $childslot,
-    jobstepname => "$job_id.$id.$childpid",
-  };
-  croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
-  $slot[$childslot]->{pid} = $childpid;
-  Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
-  Log ($id, "child $childpid started on $childslotname");
-  $Jobstep->{starttime} = time;
-  $Jobstep->{node} = $childnode->{name};
-  $Jobstep->{slotindex} = $childslot;
-  delete $Jobstep->{stderr};
-  delete $Jobstep->{finishtime};
-  delete $Jobstep->{tempfail};
-  $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
-  retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
-  splice @jobstep_todo, $todo_ptr, 1;
-  --$todo_ptr;
-  $progress_is_dirty = 1;
-  while (!@freeslot
-        ||
-        ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
-  {
-    last THISROUND if $main::please_freeze;
-    if ($main::please_info)
-    {
-      $main::please_info = 0;
-      freeze();
-      create_output_collection();
-      save_meta(1);
-      update_progress_stats();
-    }
-    my $gotsome
-       = readfrompipes ()
-       + reapchildren ();
-    if (!$gotsome || ($latest_refresh + 2 < scalar time))
-    {
-      check_refresh_wanted();
-      check_squeue();
-      update_progress_stats();
-    }
-    elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
-    {
-      update_progress_stats();
-    }
-    if (!$gotsome) {
-      select (undef, undef, undef, 0.1);
-    }
-    $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
-                                        $_->{node}->{hold_count} < 4 } @slot);
-    if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
-       ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
-    {
-      my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
-         .($thisround_failed+$thisround_succeeded)
-         .") -- giving up on this round";
-      Log (undef, $message);
-      last THISROUND;
-    }
-    # move slots from freeslot to holdslot (or back to freeslot) if necessary
-    for (my $i=$#freeslot; $i>=0; $i--) {
-      if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
-       push @holdslot, (splice @freeslot, $i, 1);
-      }
-    }
-    for (my $i=$#holdslot; $i>=0; $i--) {
-      if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
-       push @freeslot, (splice @holdslot, $i, 1);
-      }
-    }
-    # give up if no nodes are succeeding
-    if ($working_slot_count < 1) {
-      Log(undef, "Every node has failed -- giving up");
-      last THISROUND;
-    }
-  }
-push @freeslot, splice @holdslot;
-map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
-Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
-while (%proc)
-  if ($main::please_continue) {
-    $main::please_continue = 0;
-    goto THISROUND;
-  }
-  $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
-  readfrompipes ();
-  if (!reapchildren())
-  {
-    check_refresh_wanted();
-    check_squeue();
-    update_progress_stats();
-    select (undef, undef, undef, 0.1);
-    killem (keys %proc) if $main::please_freeze;
-  }
-if (!defined $main::success)
-  if (!@jobstep_todo) {
-    $main::success = 1;
-  } elsif ($working_slot_count < 1) {
-    save_output_collection();
-    save_meta();
-    exit_retry_unlocked();
-  } elsif ($thisround_succeeded == 0 &&
-           ($thisround_failed == 0 || $thisround_failed > 4)) {
-    my $message = "stop because $thisround_failed tasks failed and none succeeded";
-    Log (undef, $message);
-    $main::success = 0;
-  }
-goto ONELEVEL if !defined $main::success;
-my $collated_output = save_output_collection();
-Log (undef, "finish");
-my $final_log = save_meta();
-my $final_state;
-if ($collated_output && $final_log && $main::success) {
-  $final_state = 'Complete';
-} else {
-  $final_state = 'Failed';
-$Job->update_attributes('state' => $final_state);
-exit (($final_state eq 'Complete') ? 0 : 1);
-sub update_progress_stats
-  $progress_stats_updated = time;
-  return if !$progress_is_dirty;
-  my ($todo, $done, $running) = (scalar @jobstep_todo,
-                                 scalar @jobstep_done,
-                                 scalar keys(%proc));
-  $Job->{'tasks_summary'} ||= {};
-  $Job->{'tasks_summary'}->{'todo'} = $todo;
-  $Job->{'tasks_summary'}->{'done'} = $done;
-  $Job->{'tasks_summary'}->{'running'} = $running;
-  $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
-  Log (undef, "status: $done done, $running running, $todo todo");
-  $progress_is_dirty = 0;
-sub reapchildren
-  my $children_reaped = 0;
-  my @successful_task_uuids = ();
-  while((my $pid = waitpid (-1, WNOHANG)) > 0)
-  {
-    my $childstatus = $?;
-    my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
-                    . "."
-                    . $slot[$proc{$pid}->{slot}]->{cpu});
-    my $jobstepidx = $proc{$pid}->{jobstepidx};
-    readfrompipes_after_exit ($jobstepidx);
-    $children_reaped++;
-    my $elapsed = time - $proc{$pid}->{time};
-    my $Jobstep = $jobstep[$jobstepidx];
-    my $exitvalue = $childstatus >> 8;
-    my $exitinfo = "exit ".exit_status_s($childstatus);
-    $Jobstep->{'arvados_task'}->reload;
-    my $task_success = $Jobstep->{'arvados_task'}->{success};
-    Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
-    if (!defined $task_success) {
-      # task did not indicate one way or the other --> fail
-      Log($jobstepidx, sprintf(
-            "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
-            exit_status_s($childstatus)));
-      $Jobstep->{'arvados_task'}->{success} = 0;
-      retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
-      $task_success = 0;
-    }
-    if (!$task_success)
-    {
-      my $temporary_fail;
-      $temporary_fail ||= $Jobstep->{tempfail};
-      $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-      ++$thisround_failed;
-      ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-      # Check for signs of a failed or misconfigured node
-      if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
-          2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
-        # Don't count this against jobstep failure thresholds if this
-        # node is already suspected faulty and srun exited quickly
-        if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
-            $elapsed < 5) {
-          Log ($jobstepidx, "blaming failure on suspect node " .
-               $slot[$proc{$pid}->{slot}]->{node}->{name});
-          $temporary_fail ||= 1;
-        }
-        ban_node_by_slot($proc{$pid}->{slot});
-      }
-      Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
-                                ++$Jobstep->{'failures'},
-                                $temporary_fail ? 'temporary' : 'permanent',
-                                $elapsed));
-      if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
-        # Give up on this task, and the whole job
-        $main::success = 0;
-      }
-      # Put this task back on the todo queue
-      push @jobstep_todo, $jobstepidx;
-      $Job->{'tasks_summary'}->{'failed'}++;
-    }
-    else # task_success
-    {
-      push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
-      ++$thisround_succeeded;
-      $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
-      $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
-      $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-      push @jobstep_done, $jobstepidx;
-      Log ($jobstepidx, "success in $elapsed seconds");
-    }
-    $Jobstep->{exitcode} = $childstatus;
-    $Jobstep->{finishtime} = time;
-    $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-    retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
-    Log ($jobstepidx, sprintf("task output (%d bytes): %s",
-                              length($Jobstep->{'arvados_task'}->{output}),
-                              $Jobstep->{'arvados_task'}->{output}));
-    close $reader{$jobstepidx};
-    delete $reader{$jobstepidx};
-    delete $slot[$proc{$pid}->{slot}]->{pid};
-    push @freeslot, $proc{$pid}->{slot};
-    delete $proc{$pid};
-    $progress_is_dirty = 1;
-  }
-  if (scalar(@successful_task_uuids) > 0)
-  {
-    Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
-    # Load new tasks
-    my $newtask_list = [];
-    my $newtask_results;
-    do {
-      $newtask_results = api_call(
-        "job_tasks/list",
-        'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
-        'order' => 'qsequence',
-        'offset' => scalar(@$newtask_list),
-          );
-      push(@$newtask_list, @{$newtask_results->{items}});
-    } while (@{$newtask_results->{items}});
-    Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
-    foreach my $arvados_task (@$newtask_list) {
-      my $jobstep = {
-        'level' => $arvados_task->{'sequence'},
-        'failures' => 0,
-        'arvados_task' => $arvados_task
-      };
-      push @jobstep, $jobstep;
-      push @jobstep_todo, $#jobstep;
-    }
-  }
-  return $children_reaped;
-sub check_refresh_wanted
-  my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
-  if (@stat &&
-      $stat[9] > $latest_refresh &&
-      # ...and we have actually locked the job record...
-      $job_id eq $Job->{'uuid'}) {
-    $latest_refresh = scalar time;
-    my $Job2 = api_call("jobs/get", uuid => $jobspec);
-    for my $attr ('cancelled_at',
-                  'cancelled_by_user_uuid',
-                  'cancelled_by_client_uuid',
-                  'state') {
-      $Job->{$attr} = $Job2->{$attr};
-    }
-    if ($Job->{'state'} ne "Running") {
-      if ($Job->{'state'} eq "Cancelled") {
-        Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
-      } else {
-        Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
-      }
-      $main::success = 0;
-      $main::please_freeze = 1;
-    }
-  }
-sub check_squeue
-  my $last_squeue_check = $squeue_checked;
-  # Do not call `squeue` or check the kill list more than once every
-  # 15 seconds.
-  return if $last_squeue_check > time - 15;
-  $squeue_checked = time;
-  # Look for children from which we haven't received stderr data since
-  # the last squeue check. If no such children exist, all procs are
-  # alive and there's no need to even look at squeue.
-  #
-  # As long as the crunchstat poll interval (10s) is shorter than the
-  # squeue check interval (15s) this should make the squeue check an
-  # infrequent event.
-  my $silent_procs = 0;
-  for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
-  {
-    if (!exists($js->{stderr_at}))
-    {
-      $js->{stderr_at} = 0;
-    }
-    if ($js->{stderr_at} < $last_squeue_check)
-    {
-      $silent_procs++;
-    }
-  }
-  return if $silent_procs == 0;
-  # use killem() on procs whose killtime is reached
-  while (my ($pid, $procinfo) = each %proc)
-  {
-    my $js = $jobstep[$procinfo->{jobstepidx}];
-    if (exists $procinfo->{killtime}
-        && $procinfo->{killtime} <= time
-        && $js->{stderr_at} < $last_squeue_check)
-    {
-      my $sincewhen = "";
-      if ($js->{stderr_at}) {
-        $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
-      }
-      Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
-      killem ($pid);
-    }
-  }
-  if (!$have_slurm)
-  {
-    # here is an opportunity to check for mysterious problems with local procs
-    return;
-  }
-  # Get a list of steps still running.  Note: squeue(1) says --steps
-  # selects a format (which we override anyway) and allows us to
-  # specify which steps we're interested in (which we don't).
-  # Importantly, it also changes the meaning of %j from "job name" to
-  # "step name" and (although this isn't mentioned explicitly in the
-  # docs) switches from "one line per job" mode to "one line per step"
-  # mode. Without it, we'd just get a list of one job, instead of a
-  # list of N steps.
-  my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
-  if ($? != 0)
-  {
-    Log(undef, "warning: squeue exit status $? ($!)");
-    return;
-  }
-  chop @squeue;
-  # which of my jobsteps are running, according to squeue?
-  my %ok;
-  for my $jobstepname (@squeue)
-  {
-    $ok{$jobstepname} = 1;
-  }
-  # Check for child procs >60s old and not mentioned by squeue.
-  while (my ($pid, $procinfo) = each %proc)
-  {
-    if ($procinfo->{time} < time - 60
-        && $procinfo->{jobstepname}
-        && !exists $ok{$procinfo->{jobstepname}}
-        && !exists $procinfo->{killtime})
-    {
-      # According to slurm, this task has ended (successfully or not)
-      # -- but our srun child hasn't exited. First we must wait (30
-      # seconds) in case this is just a race between communication
-      # channels. Then, if our srun child process still hasn't
-      # terminated, we'll conclude some slurm communication
-      # error/delay has caused the task to die without notifying srun,
-      # and we'll kill srun ourselves.
-      $procinfo->{killtime} = time + 30;
-      Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
-    }
-  }
-sub check_sinfo
-  # If a node fails in a multi-node "srun" call during job setup, the call
-  # may hang instead of exiting with a nonzero code.  This function checks
-  # "sinfo" for the health of the nodes that were allocated and ensures that
-  # they are all still in the "alloc" state.  If a node that is allocated to
-  # this job is not in "alloc" state, then set please_freeze.
-  #
-  # This is only called from srun_sync() for node configuration.  If a
-  # node fails doing actual work, there are other recovery mechanisms.
-  # Do not call `sinfo` more than once every 15 seconds.
-  return if $sinfo_checked > time - 15;
-  $sinfo_checked = time;
-  # The output format "%t" means output node states.
-  my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
-  if ($? != 0)
-  {
-    Log(undef, "warning: sinfo exit status $? ($!)");
-    return;
-  }
-  chop @sinfo;
-  foreach (@sinfo)
-  {
-    if ($_ != "alloc" && $_ != "alloc*") {
-      $main::please_freeze = 1;
-    }
-  }
-sub release_allocation
-  if ($have_slurm)
-  {
-    Log (undef, "release job allocation");
-    system "scancel $ENV{SLURM_JOB_ID}";
-  }
-sub readfrompipes
-  my $gotsome = 0;
-  my %fd_job;
-  my $sel = IO::Select->new();
-  foreach my $jobstepidx (keys %reader)
-  {
-    my $fd = $reader{$jobstepidx};
-    $sel->add($fd);
-    $fd_job{$fd} = $jobstepidx;
-    if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
-      $sel->add($stdout_fd);
-      $fd_job{$stdout_fd} = $jobstepidx;
-    }
-  }
-  # select on all reader fds with 0.1s timeout
-  my @ready_fds = $sel->can_read(0.1);
-  foreach my $fd (@ready_fds)
-  {
-    my $buf;
-    if (0 < sysread ($fd, $buf, 65536))
-    {
-      $gotsome = 1;
-      print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      my $jobstepidx = $fd_job{$fd};
-      if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
-        $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
-        next;
-      }
-      $jobstep[$jobstepidx]->{stderr_at} = time;
-      $jobstep[$jobstepidx]->{stderr} .= $buf;
-      # Consume everything up to the last \n
-      preprocess_stderr ($jobstepidx);
-      if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
-      {
-        # If we get a lot of stderr without a newline, chop off the
-        # front to avoid letting our buffer grow indefinitely.
-        substr ($jobstep[$jobstepidx]->{stderr},
-                0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
-      }
-    }
-  }
-  return $gotsome;
-# Consume all full lines of stderr for a jobstep. Everything after the
-# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
-# returning.
-sub preprocess_stderr
-  my $jobstepidx = shift;
-  # slotindex is only defined for children running Arvados job tasks.
-  # Be prepared to handle the undef case (for setup srun calls, etc.).
-  my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
-  while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
-    my $line = $1;
-    substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
-    Log ($jobstepidx, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
-      # If the allocation is revoked, we can't possibly continue, so mark all
-      # nodes as failed.  This will cause the overall exit code to be
-      # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
-      # this job.
-      $main::please_freeze = 1;
-      foreach my $st (@slot) {
-        $st->{node}->{fail_count}++;
-      }
-    }
-    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
-      $jobstep[$jobstepidx]->{tempfail} = 1;
-      if (defined($job_slot_index)) {
-        $slot[$job_slot_index]->{node}->{fail_count}++;
-        ban_node_by_slot($job_slot_index);
-      }
-    }
-    elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
-      $jobstep[$jobstepidx]->{tempfail} = 1;
-      ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
-    }
-    elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
-      $jobstep[$jobstepidx]->{tempfail} = 1;
-    }
-  }
-# Read whatever is still available on its stderr+stdout pipes after
-# the given child process has exited.
-sub readfrompipes_after_exit
-  my $jobstepidx = shift;
-  # The fact that the child has exited allows some convenient
-  # simplifications: (1) all data must have already been written, so
-  # there's no need to wait for more once sysread returns 0; (2) the
-  # total amount of data available is bounded by the pipe buffer size,
-  # so it's safe to read everything into one string.
-  my $buf;
-  while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
-    $jobstep[$jobstepidx]->{stderr_at} = time;
-    $jobstep[$jobstepidx]->{stderr} .= $buf;
-  }
-  if ($jobstep[$jobstepidx]->{stdout_r}) {
-    while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
-      $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
-    }
-  }
-  preprocess_stderr ($jobstepidx);
-  map {
-    Log ($jobstepidx, "stderr $_");
-  } split ("\n", $jobstep[$jobstepidx]->{stderr});
-  $jobstep[$jobstepidx]->{stderr} = '';
-sub fetch_block
-  my $hash = shift;
-  my $keep;
-  if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
-    Log(undef, "fetch_block run error from arv-get $hash: $!");
-    return undef;
-  }
-  my $output_block = "";
-  while (1) {
-    my $buf;
-    my $bytes = sysread($keep, $buf, 1024 * 1024);
-    if (!defined $bytes) {
-      Log(undef, "fetch_block read error from arv-get: $!");
-      $output_block = undef;
-      last;
-    } elsif ($bytes == 0) {
-      # sysread returns 0 at the end of the pipe.
-      last;
-    } else {
-      # some bytes were read into buf.
-      $output_block .= $buf;
-    }
-  }
-  close $keep;
-  if ($?) {
-    Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
-    $output_block = undef;
-  }
-  return $output_block;
-# Create a collection by concatenating the output of all tasks (each
-# task's output is either a manifest fragment, a locator for a
-# manifest fragment stored in Keep, or nothing at all). Return the
-# portable_data_hash of the new collection.
-sub create_output_collection
-  Log (undef, "collate");
-  my ($child_out, $child_in);
-  # This depends on the python-arvados-python-client package, which needs to be installed
-  # on the machine running crunch-dispatch (typically, the API server).
-  my $pid = open2($child_out, $child_in, '/usr/share/python2.7/dist/python-arvados-python-client/bin/python', '-c', q{
-import arvados
-import sys
-print (arvados.api("v1").collections().
-       create(body={"manifest_text": sys.stdin.read(),
-                    "owner_uuid": sys.argv[2]}).
-       execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
-}, retry_count(), $Job->{owner_uuid});
-  my $task_idx = -1;
-  my $manifest_size = 0;
-  for (@jobstep)
-  {
-    ++$task_idx;
-    my $output = $_->{'arvados_task'}->{output};
-    next if (!defined($output));
-    my $next_write;
-    if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
-      $next_write = fetch_block($output);
-    } else {
-      $next_write = $output;
-    }
-    if (defined($next_write)) {
-      if (!defined(syswrite($child_in, $next_write))) {
-        # There's been an error writing.  Stop the loop.
-        # We'll log details about the exit code later.
-        last;
-      } else {
-        $manifest_size += length($next_write);
-      }
-    } else {
-      my $uuid = $_->{'arvados_task'}->{'uuid'};
-      Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
-      $main::success = 0;
-    }
-  }
-  close($child_in);
-  Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
-  my $joboutput;
-  my $s = IO::Select->new($child_out);
-  if ($s->can_read(120)) {
-    sysread($child_out, $joboutput, 1024 * 1024);
-    waitpid($pid, 0);
-    if ($?) {
-      Log(undef, "output collection creation exited " . exit_status_s($?));
-      $joboutput = undef;
-    } else {
-      chomp($joboutput);
-    }
-  } else {
-    Log (undef, "timed out while creating output collection");
-    foreach my $signal (2, 2, 2, 15, 15, 9) {
-      kill($signal, $pid);
-      last if waitpid($pid, WNOHANG) == -1;
-      sleep(1);
-    }
-  }
-  close($child_out);
-  return $joboutput;
-# Calls create_output_collection, logs the result, and returns it.
-# If that was successful, save that as the output in the job record.
-sub save_output_collection {
-  my $collated_output = create_output_collection();
-  if (!$collated_output) {
-    Log(undef, "Failed to write output collection");
-  }
-  else {
-    Log(undef, "job output $collated_output");
-    $Job->update_attributes('output' => $collated_output);
-  }
-  return $collated_output;
-sub killem
-  foreach (@_)
-  {
-    my $sig = 2;               # SIGINT first
-    if (exists $proc{$_}->{"sent_$sig"} &&
-       time - $proc{$_}->{"sent_$sig"} > 4)
-    {
-      $sig = 15;               # SIGTERM if SIGINT doesn't work
-    }
-    if (exists $proc{$_}->{"sent_$sig"} &&
-       time - $proc{$_}->{"sent_$sig"} > 4)
-    {
-      $sig = 9;                        # SIGKILL if SIGTERM doesn't work
-    }
-    if (!exists $proc{$_}->{"sent_$sig"})
-    {
-      Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
-      kill $sig, $_;
-      select (undef, undef, undef, 0.1);
-      if ($sig == 2)
-      {
-       kill $sig, $_;     # srun wants two SIGINT to really interrupt
-      }
-      $proc{$_}->{"sent_$sig"} = time;
-      $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
-    }
-  }
-sub fhbits
-  my($bits);
-  for (@_) {
-    vec($bits,fileno($_),1) = 1;
-  }
-  $bits;
-# Send log output to Keep via arv-put.
-# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
-# $log_pipe_out_buf is a string containing all output read from arv-put so far.
-# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
-# $log_pipe_pid is the pid of the arv-put subprocess.
-# The only functions that should access these variables directly are:
-# log_writer_start($logfilename)
-#     Starts an arv-put pipe, reading data on stdin and writing it to
-#     a $logfilename file in an output collection.
-# log_writer_read_output([$timeout])
-#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
-#     Passes $timeout to the select() call, with a default of 0.01.
-#     Returns the result of the last read() call on $log_pipe_out, or
-#     -1 if read() wasn't called because select() timed out.
-#     Only other log_writer_* functions should need to call this.
-# log_writer_send($txt)
-#     Writes $txt to the output log collection.
-# log_writer_finish()
-#     Closes the arv-put pipe and returns the output that it produces.
-# log_writer_is_active()
-#     Returns a true value if there is currently a live arv-put
-#     process, false otherwise.
-my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
-    $log_pipe_pid);
-sub log_writer_start($)
-  my $logfilename = shift;
-  $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
-                        'arv-put',
-                        '--stream',
-                        '--retries', '6',
-                        '--filename', $logfilename,
-                        '-');
-  $log_pipe_out_buf = "";
-  $log_pipe_out_select = IO::Select->new($log_pipe_out);
-sub log_writer_read_output {
-  my $timeout = shift || 0.01;
-  my $read = -1;
-  while ($read && $log_pipe_out_select->can_read($timeout)) {
-    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
-                 length($log_pipe_out_buf));
-  }
-  if (!defined($read)) {
-    Log(undef, "error reading log manifest from arv-put: $!");
-  }
-  return $read;
-sub log_writer_send($)
-  my $txt = shift;
-  print $log_pipe_in $txt;
-  log_writer_read_output();
-sub log_writer_finish()
-  return unless $log_pipe_pid;
-  close($log_pipe_in);
-  my $logger_failed = 0;
-  my $read_result = log_writer_read_output(600);
-  if ($read_result == -1) {
-    $logger_failed = -1;
-    Log (undef, "timed out reading from 'arv-put'");
-  } elsif ($read_result != 0) {
-    $logger_failed = -2;
-    Log(undef, "failed to read arv-put log manifest to EOF");
-  }
-  waitpid($log_pipe_pid, 0);
-  if ($?) {
-    $logger_failed ||= $?;
-    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
-  }
-  close($log_pipe_out);
-  my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
-      $log_pipe_out_select = undef;
-  return $arv_put_output;
-sub log_writer_is_active() {
-  return $log_pipe_pid;
-sub Log                                # ($jobstepidx, $logmessage)
-  my ($jobstepidx, $logmessage) = @_;
-  if ($logmessage =~ /\n/) {
-    for my $line (split (/\n/, $_[1])) {
-      Log ($jobstepidx, $line);
-    }
-    return;
-  }
-  my $fh = select STDERR; $|=1; select $fh;
-  my $task_qseq = '';
-  if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
-    $task_qseq = $jobstepidx;
-  }
-  my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
-  $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
-  $message .= "\n";
-  my $datetime;
-  if (log_writer_is_active() || -t STDERR) {
-    my @gmtime = gmtime;
-    $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
-                        $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
-  }
-  print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
-  if (log_writer_is_active()) {
-    log_writer_send($datetime . " " . $message);
-  }
-sub croak
-  my ($package, $file, $line) = caller;
-  my $message = "@_ at $file line $line\n";
-  Log (undef, $message);
-  release_allocation();
-  freeze() if @jobstep_todo;
-  create_output_collection() if @jobstep_todo;
-  cleanup();
-  save_meta();
-  die;
-sub cleanup
-  return unless $Job;
-  if ($Job->{'state'} eq 'Cancelled') {
-    $Job->update_attributes('finished_at' => scalar gmtime);
-  } else {
-    $Job->update_attributes('state' => 'Failed');
-  }
-sub save_meta
-  my $justcheckpoint = shift; # false if this will be the last meta saved
-  return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
-  return unless log_writer_is_active();
-  my $log_manifest = log_writer_finish();
-  return unless defined($log_manifest);
-  if ($Job->{log}) {
-    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
-    $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
-  }
-  my $log_coll = api_call(
-    "collections/create", ensure_unique_name => 1, collection => {
-      manifest_text => $log_manifest,
-      owner_uuid => $Job->{owner_uuid},
-      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
-    });
-  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
-  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
-  return $log_coll->{portable_data_hash};
-sub freeze_if_want_freeze
-  if ($main::please_freeze)
-  {
-    release_allocation();
-    if (@_)
-    {
-      # kill some srun procs before freeze+stop
-      map { $proc{$_} = {} } @_;
-      while (%proc)
-      {
-       killem (keys %proc);
-       select (undef, undef, undef, 0.1);
-       my $died;
-       while (($died = waitpid (-1, WNOHANG)) > 0)
-       {
-         delete $proc{$died};
-       }
-      }
-    }
-    freeze();
-    create_output_collection();
-    cleanup();
-    save_meta();
-    exit 1;
-  }
-sub freeze
-  Log (undef, "Freeze not implemented");
-  return;
-sub thaw
-  croak ("Thaw not implemented");
-sub freezequote
-  my $s = shift;
-  $s =~ s/\\/\\\\/g;
-  $s =~ s/\n/\\n/g;
-  return $s;
-sub freezeunquote
-  my $s = shift;
-  $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
-  return $s;
-sub srun_sync
-  my $srunargs = shift;
-  my $execargs = shift;
-  my $opts = shift || {};
-  my $stdin = shift;
-  my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
-  Log (undef, "$label: start");
-  my ($stderr_r, $stderr_w);
-  pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
-  my ($stdout_r, $stdout_w);
-  pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
-  my $started_srun = scalar time;
-  my $srunpid = fork();
-  if ($srunpid == 0)
-  {
-    close($stderr_r);
-    close($stdout_r);
-    fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
-    fcntl($stdout_w, F_SETFL, 0) or croak($!);
-    open(STDERR, ">&", $stderr_w) or croak ($!);
-    open(STDOUT, ">&", $stdout_w) or croak ($!);
-    srun ($srunargs, $execargs, $opts, $stdin);
-    exit (1);
-  }
-  close($stderr_w);
-  close($stdout_w);
-  set_nonblocking($stderr_r);
-  set_nonblocking($stdout_r);
-  # Add entries to @jobstep and %proc so check_squeue() and
-  # freeze_if_want_freeze() can treat it like a job task process.
-  push @jobstep, {
-    stderr => '',
-    stderr_at => 0,
-    stderr_captured => '',
-    stdout_r => $stdout_r,
-    stdout_captured => '',
-  };
-  my $jobstepidx = $#jobstep;
-  $proc{$srunpid} = {
-    jobstepidx => $jobstepidx,
-  };
-  $reader{$jobstepidx} = $stderr_r;
-  while ($srunpid != waitpid ($srunpid, WNOHANG)) {
-    my $busy = readfrompipes();
-    if (!$busy || ($latest_refresh + 2 < scalar time)) {
-      check_refresh_wanted();
-      check_squeue();
-      check_sinfo();
-    }
-    if (!$busy) {
-      select(undef, undef, undef, 0.1);
-    }
-    if (($started_srun + $srun_sync_timeout) < scalar time) {
-      # Exceeded general timeout for "srun_sync" operations, likely
-      # means something got stuck on the remote node.
-      Log(undef, "srun_sync exceeded timeout, will fail.");
-      $main::please_freeze = 1;
-    }
-    killem(keys %proc) if $main::please_freeze;
-  }
-  my $exited = $?;
-  readfrompipes_after_exit ($jobstepidx);
-  Log (undef, "$label: exit ".exit_status_s($exited));
-  close($stdout_r);
-  close($stderr_r);
-  delete $proc{$srunpid};
-  delete $reader{$jobstepidx};
-  my $j = pop @jobstep;
-  # If the srun showed signs of tempfail, ensure the caller treats that as a
-  # failure case.
-  if ($main::please_freeze || $j->{tempfail}) {
-    $exited ||= 255;
-  }
-  return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
-sub srun
-  my $srunargs = shift;
-  my $execargs = shift;
-  my $opts = shift || {};
-  my $stdin = shift;
-  my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
-  $Data::Dumper::Terse = 1;
-  $Data::Dumper::Indent = 0;
-  my $show_cmd = Dumper($args);
-  $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
-  $show_cmd =~ s/\n/ /g;
-  if ($opts->{fork}) {
-    Log(undef, "starting: $show_cmd");
-  } else {
-    # This is a child process: parent is in charge of reading our
-    # stderr and copying it to Log() if needed.
-    warn "starting: $show_cmd\n";
-  }
-  if (defined $stdin) {
-    my $child = open STDIN, "-|";
-    defined $child or die "no fork: $!";
-    if ($child == 0) {
-      print $stdin or die $!;
-      close STDOUT or die $!;
-      exit 0;
-    }
-  }
-  return system (@$args) if $opts->{fork};
-  exec @$args;
-  warn "ENV size is ".length(join(" ",%ENV));
-  die "exec failed: $!: @$args";
-sub ban_node_by_slot {
-  # Don't start any new jobsteps on this node for 60 seconds
-  my $slotid = shift;
-  $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
-  $slot[$slotid]->{node}->{hold_count}++;
-  Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
-sub must_lock_now
-  my ($lockfile, $error_message) = @_;
-  open L, ">", $lockfile or croak("$lockfile: $!");
-  if (!flock L, LOCK_EX|LOCK_NB) {
-    croak("Can't lock $lockfile: $error_message\n");
-  }
-sub find_docker_image {
-  # Given a Keep locator, check to see if it contains a Docker image.
-  # If so, return its stream name and Docker hash.
-  # If not, return undef for both values.
-  my $locator = shift;
-  my ($streamname, $filename);
-  my $image = api_call("collections/get", uuid => $locator);
-  if ($image) {
-    foreach my $line (split(/\n/, $image->{manifest_text})) {
-      my @tokens = split(/\s+/, $line);
-      next if (!@tokens);
-      $streamname = shift(@tokens);
-      foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
-        if (defined($filename)) {
-          return (undef, undef);  # More than one file in the Collection.
-        } else {
-          $filename = (split(/:/, $filedata, 3))[2];
-          $filename =~ s/\\([0-3][0-7][0-7])/chr(oct($1))/ge;
-        }
-      }
-    }
-  }
-  if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
-    return ($streamname, $1);
-  } else {
-    return (undef, undef);
-  }
-sub exit_retry_unlocked {
-  Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
-sub retry_count {
-  # Calculate the number of times an operation should be retried,
-  # assuming exponential backoff, and that we're willing to retry as
-  # long as tasks have been running.  Enforce a minimum of 3 retries.
-  my ($starttime, $endtime, $timediff, $retries);
-  if (@jobstep) {
-    $starttime = $jobstep[0]->{starttime};
-    $endtime = $jobstep[-1]->{finishtime};
-  }
-  if (!defined($starttime)) {
-    $timediff = 0;
-  } elsif (!defined($endtime)) {
-    $timediff = time - $starttime;
-  } else {
-    $timediff = ($endtime - $starttime) - (time - $endtime);
-  }
-  if ($timediff > 0) {
-    $retries = int(log($timediff) / log(2));
-  } else {
-    $retries = 1;  # Use the minimum.
-  }
-  return ($retries > 3) ? $retries : 3;
-sub retry_op {
-  # Pass in two function references.
-  # This method will be called with the remaining arguments.
-  # If it dies, retry it with exponential backoff until it succeeds,
-  # or until the current retry_count is exhausted.  After each failure
-  # that can be retried, the second function will be called with
-  # the current try count (0-based), next try time, and error message.
-  my $operation = shift;
-  my $op_text = shift;
-  my $retries = retry_count();
-  my $retry_callback = sub {
-    my ($try_count, $next_try_at, $errmsg) = @_;
-    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
-    $errmsg =~ s/\s/ /g;
-    $errmsg =~ s/\s+$//;
-    my $retry_msg;
-    if ($next_try_at < time) {
-      $retry_msg = "Retrying.";
-    } else {
-      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
-      $retry_msg = "Retrying at $next_try_fmt.";
-    }
-    Log(undef, "$op_text failed: $errmsg. $retry_msg");
-  };
-  foreach my $try_count (0..$retries) {
-    my $next_try = time + (2 ** $try_count);
-    my $result = eval { $operation->(@_); };
-    if (!$@) {
-      return $result;
-    } elsif ($try_count < $retries) {
-      $retry_callback->($try_count, $next_try, $@);
-      my $sleep_time = $next_try - time;
-      sleep($sleep_time) if ($sleep_time > 0);
-    }
-  }
-  # Ensure the error message ends in a newline, so Perl doesn't add
-  # retry_op's line number to it.
-  chomp($@);
-  die($@ . "\n");
-sub api_call {
-  # Pass in a /-separated API method name, and arguments for it.
-  # This function will call that method, retrying as needed until
-  # the current retry_count is exhausted, with a log on the first failure.
-  my $method_name = shift;
-  my $method = $arv;
-  foreach my $key (split(/\//, $method_name)) {
-    $method = $method->{$key};
-  }
-  return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
-sub exit_status_s {
-  # Given a $?, return a human-readable exit code string like "0" or
-  # "1" or "0 with signal 1" or "1 with signal 11".
-  my $exitcode = shift;
-  my $s = $exitcode >> 8;
-  if ($exitcode & 0x7f) {
-    $s .= " with signal " . ($exitcode & 0x7f);
-  }
-  if ($exitcode & 0x80) {
-    $s .= " with core dump";
-  }
-  return $s;
-sub handle_readall {
-  # Pass in a glob reference to a file handle.
-  # Read all its contents and return them as a string.
-  my $fh_glob_ref = shift;
-  local $/ = undef;
-  return <$fh_glob_ref>;
-sub tar_filename_n {
-  my $n = shift;
-  return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
-sub add_git_archive {
-  # Pass in a git archive command as a string or list, a la system().
-  # This method will save its output to be included in the archive sent to the
-  # build script.
-  my $git_input;
-  $git_tar_count++;
-  if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
-    croak("Failed to save git archive: $!");
-  }
-  my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
-  close($git_input);
-  waitpid($git_pid, 0);
-  close(GIT_ARCHIVE);
-  if ($?) {
-    croak("Failed to save git archive: git exited " . exit_status_s($?));
-  }
-sub combined_git_archive {
-  # Combine all saved tar archives into a single archive, then return its
-  # contents in a string.  Return undef if no archives have been saved.
-  if ($git_tar_count < 1) {
-    return undef;
-  }
-  my $base_tar_name = tar_filename_n(1);
-  foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
-    my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
-    if ($tar_exit != 0) {
-      croak("Error preparing build archive: tar -A exited " .
-            exit_status_s($tar_exit));
-    }
-  }
-  if (!open(GIT_TAR, "<", $base_tar_name)) {
-    croak("Could not open build archive: $!");
-  }
-  my $tar_contents = handle_readall(\*GIT_TAR);
-  close(GIT_TAR);
-  return $tar_contents;
-sub set_nonblocking {
-  my $fh = shift;
-  my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
-  fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
-#!/usr/bin/env perl
-# This is crunch-job's internal dispatch script.  crunch-job running on the API
-# server invokes this script on individual compute nodes, or localhost if we're
-# running a job locally.  It gets called in two modes:
-# * No arguments: Installation mode.  Read a tar archive from the DATA
-#   file handle; it includes the Crunch script's source code, and
-#   maybe SDKs as well.  Those should be installed in the proper
-#   locations.  This runs outside of any Docker container, so don't try to
-#   introspect Crunch's runtime environment.
-# * With arguments: Crunch script run mode.  This script should set up the
-#   environment, then run the command specified in the arguments.  This runs
-#   inside any Docker container.
-use Fcntl ':flock';
-use File::Path qw( make_path remove_tree );
-use POSIX qw(getcwd);
-use constant TASK_TEMPFAIL => 111;
-# Map SDK subdirectories to the path environments they belong to.
-my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
-my $destdir = $ENV{"CRUNCH_SRC"};
-my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
-my $repo = $ENV{"CRUNCH_SRC_URL"};
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
-my $job_work = $ENV{"JOB_WORK"};
-my $task_work = $ENV{"TASK_WORK"};
-open(STDOUT_ORIG, ">&", STDOUT);
-open(STDERR_ORIG, ">&", STDERR);
-for my $dir ($destdir, $job_work, $task_work) {
-  if ($dir) {
-    make_path $dir;
-    -e $dir or die "Failed to create temporary directory ($dir): $!";
-  }
-if ($task_work) {
-  remove_tree($task_work, {keep_root => 1});
-### Crunch script run mode
-if (@ARGV) {
-  # We want to do routine logging during task 0 only.  This gives the user
-  # the information they need, but avoids repeating the information for every
-  # task.
-  my $Log;
-  if ($ENV{TASK_SEQUENCE} eq "0") {
-    $Log = sub {
-      my $msg = shift;
-      printf STDERR_ORIG "[Crunch] $msg\n", @_;
-    };
-  } else {
-    $Log = sub { };
-  }
-  my $python_src = "$install_dir/python";
-  my $venv_dir = "$job_work/.arvados.venv";
-  my $venv_built = -e "$venv_dir/bin/activate";
-  if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
-    shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
-                 "--python=python2.7", $venv_dir);
-    shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
-    $venv_built = 1;
-    $Log->("Built Python SDK virtualenv");
-  }
-  my @pysdk_version_cmd = ("python", "-c",
-    "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
-  if ($venv_built) {
-    $Log->("Running in Python SDK virtualenv");
-    @pysdk_version_cmd = ();
-    my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
-    @ARGV = ("/bin/sh", "-ec",
-             ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
-  } elsif (-d $python_src) {
-    $Log->("Warning: virtualenv not found inside Docker container default " .
-           "\$PATH. Can't install Python SDK.");
-  }
-  if (@pysdk_version_cmd) {
-    open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
-    my $pysdk_version = <$pysdk_version_pipe>;
-    close($pysdk_version_pipe);
-    if ($? == 0) {
-      chomp($pysdk_version);
-      $Log->("Using Arvados SDK version $pysdk_version");
-    } else {
-      # A lot could've gone wrong here, but pretty much all of it means that
-      # Python won't be able to load the Arvados SDK.
-      $Log->("Warning: Arvados SDK not found");
-    }
-  }
-  while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
-    my $sdk_path = "$install_dir/$sdk_dir";
-    if (-d $sdk_path) {
-      if ($ENV{$sdk_envkey}) {
-        $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
-      } else {
-        $ENV{$sdk_envkey} = $sdk_path;
-      }
-      $Log->("Arvados SDK added to %s", $sdk_envkey);
-    }
-  }
-  exec(@ARGV);
-  die "Cannot exec `@ARGV`: $!";
-### Installation mode
-open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
-flock L, LOCK_EX;
-if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
-  # This exact git archive (source + arvados sdk) is already installed
-  # here, so there's no need to reinstall it.
-  # We must consume our DATA section, though: otherwise the process
-  # feeding it to us will get SIGPIPE.
-  my $buf;
-  while (read(DATA, $buf, 65536)) { }
-  exit(0);
-unlink "$destdir.archive_hash";
-mkdir $destdir;
-do {
-  # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
-  local $SIG{PIPE} = "IGNORE";
-  warn "Extracting archive: $archive_hash\n";
-  # --ignore-zeros is necessary sometimes: depending on how much NUL
-  # padding tar -A put on our combined archive (which in turn depends
-  # on the length of the component archives) tar without
-  # --ignore-zeros will exit before consuming stdin and cause close()
-  # to fail on the resulting SIGPIPE.
-  if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
-    die "Error launching 'tar -xC $destdir': $!";
-  }
-  # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
-  # get SIGPIPE.  We must feed it data incrementally.
-  my $tar_input;
-  while (read(DATA, $tar_input, 65536)) {
-    print TARX $tar_input;
-  }
-  if(!close(TARX)) {
-    die "'tar -xC $destdir' exited $?: $!";
-  }
-mkdir $install_dir;
-my $sdk_root = "$destdir/.arvados.sdk/sdk";
-if (-d $sdk_root) {
-  foreach my $sdk_lang (("python",
-                         map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
-    if (-d "$sdk_root/$sdk_lang") {
-      if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
-        die "Failed to install $sdk_lang SDK: $!";
-      }
-    }
-  }
-my $python_dir = "$install_dir/python";
-if ((-d $python_dir) and can_run("python2.7")) {
-  open(my $egg_info_pipe, "-|",
-       "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
-  my @egg_info_errors = <$egg_info_pipe>;
-  close($egg_info_pipe);
-  if ($?) {
-    if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
-      # egg_info apparently failed because it couldn't ask git for a build tag.
-      # Specify no build tag.
-      open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
-      print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
-      close($pysdk_cfg);
-    } else {
-      my $egg_info_exit = $? >> 8;
-      foreach my $errline (@egg_info_errors) {
-        warn $errline;
-      }
-      warn "python setup.py egg_info failed: exit $egg_info_exit";
-      exit ($egg_info_exit || 1);
-    }
-  }
-# Hide messages from the install script (unless it fails: shell_or_die
-# will show $destdir.log in that case).
-open(STDOUT, ">>", "$destdir.log") or die ($!);
-open(STDERR, ">&", STDOUT) or die ($!);
-if (-e "$destdir/crunch_scripts/install") {
-    shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
-} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
-    # Old version
-    shell_or_die (undef, "./tests/autotests.sh", $install_dir);
-} elsif (-e "./install.sh") {
-    shell_or_die (undef, "./install.sh", $install_dir);
-if ($archive_hash) {
-    unlink "$destdir.archive_hash.new";
-    symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
-    rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
-close L;
-sub can_run {
-  my $command_name = shift;
-  open(my $which, "-|", "which", $command_name) or die ($!);
-  while (<$which>) { }
-  close($which);
-  return ($? == 0);
-sub shell_or_die
-  my $exitcode = shift;
-  if ($ENV{"DEBUG"}) {
-    print STDERR "@_\n";
-  }
-  if (system (@_) != 0) {
-    my $err = $!;
-    my $code = $?;
-    my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
-    open STDERR, ">&STDERR_ORIG";
-    system ("cat $destdir.log >&2");
-    warn "@_ failed ($err): $exitstatus";
-    if (defined($exitcode)) {
-      exit $exitcode;
-    }
-    else {
-      exit (($code >> 8) || 1);
-    }
-  }