#!/usr/bin/env ruby

# == Synopsis
#
#  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
#  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
#
# Satisfy a pipeline template by finding or submitting a mapreduce job
# for each pipeline component.
#
# == Options
#
# [--template uuid] Use the specified pipeline template.
#
# [--template path] Load the pipeline template from the specified
#                   local file.
#
# [--instance uuid] Use the specified pipeline instance.
#
# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
#                 to finish. Just find out whether jobs are finished,
#                 queued, or running for each component
#
# [--submit] Do not try to satisfy any components. Just
#                          create an instance, print its UUID to
#                          stdout, and exit.
#
# [--no-wait] Make only as much progress as possible without entering
#             a sleep/poll loop.
#
# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
#              components. Submit a new job for every component.
#
# [--debug] Print extra debugging information on stderr.
#
# [--debug-level N] Increase amount of debugging information. Default
#                   1, possible range 0..3.
#
# [--status-text path] Print plain text status report to a file or
#                      fifo. Default: /dev/stdout
#
# [--status-json path] Print JSON status report to a file or
#                      fifo. Default: /dev/null
#
# [--description] Description for the pipeline instance.
#
# == Parameters
#
# [param_name=param_value]
#
# [param_name param_value] Set (or override) the default value for
#                          every parameter with the given name.
#
# [component_name::param_name=param_value]
# [component_name::param_name param_value]
# [--component_name::param_name=param_value]
# [--component_name::param_name param_value] Set the value of a
#                                            parameter for a single
#                                            component.
#
class WhRunPipelineInstance
end

if RUBY_VERSION < '1.9.3' then
  abort <<-EOS
#{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
  EOS
end

begin
  require 'arvados'
  require 'rubygems'
  require 'json'
  require 'pp'
  require 'trollop'
  require 'google/api_client'
rescue LoadError => l
  puts $:
  abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
Try: gem install arvados pp google-api-client json trollop
  EOS
end

def debuglog(message, verbosity=1)
  $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
end

# Parse command line options (the kind that control the behavior of
# this program, that is, not the pipeline component parameters).

p = Trollop::Parser.new do
  version __FILE__
  opt(:dry_run,
      "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
      :type => :boolean,
      :short => :n)
  opt(:status_text,
      "Store plain text status in given file.",
      :short => :none,
      :type => :string,
      :default => '/dev/stdout')
  opt(:status_json,
      "Store json-formatted pipeline in given file.",
      :short => :none,
      :type => :string,
      :default => '/dev/null')
  opt(:no_wait,
      "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
      :short => :none,
      :type => :boolean)
  opt(:no_reuse,
      "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
      :short => :none,
      :type => :boolean)
  opt(:debug,
      "Print extra debugging information on stderr.",
      :type => :boolean)
  opt(:debug_level,
      "Set debug verbosity level.",
      :short => :none,
      :type => :integer)
  opt(:template,
      "UUID of pipeline template, or path to local pipeline template file.",
      :short => :none,
      :type => :string)
  opt(:instance,
      "UUID of pipeline instance.",
      :short => :none,
      :type => :string)
  opt(:submit,
      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
      :short => :none,
      :type => :boolean)
  opt(:run_pipeline_here,
      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
      :short => :none,
      :type => :boolean)
  opt(:run_jobs_here,
      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
      :short => :none,
      :type => :boolean)
  opt(:run_here,
      "Synonym for --run-jobs-here.",
      :short => :none,
      :type => :boolean)
  opt(:description,
      "Description for the pipeline instance.",
      :short => :none,
      :type => :string)
  stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
  p.parse ARGV
end
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0

$options[:run_jobs_here] ||= $options[:run_here] # old flag name
$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A

if $options[:instance]
  if $options[:template] or $options[:submit]
    abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
  end
elsif not $options[:template]
  puts "error: you must supply a --template or --instance."
  p.educate
  abort
end

if $options[:run_pipeline_here] == $options[:submit]
  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
end

# Set up the API client.

$arv = Arvados.new api_version: 'v1'
$client = $arv.client
$arvados = $arv.arvados_api

class PipelineInstance
  def self.find(uuid)
    result = $client.execute(:api_method => $arvados.pipeline_instances.get,
                             :parameters => {
                               :uuid => uuid
                             },
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
      nil
    else
      debuglog "Retrieved pipeline_instance #{j[:uuid]}"
      self.new(j)
    end
  end
  def self.create(attributes)
    result = $client.execute(:api_method => $arvados.pipeline_instances.create,
                             :body_object => {
                               :pipeline_instance => attributes
                             },
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
    end
    debuglog "Created pipeline instance: #{j[:uuid]}"
    self.new(j)
  end
  def save
    result = $client.execute(:api_method => $arvados.pipeline_instances.update,
                             :parameters => {
                               :uuid => @pi[:uuid]
                             },
                             :body_object => {
                               :pipeline_instance => @attributes_to_update
                             },
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
      nil
    else
      @attributes_to_update = {}
      @pi = j
    end
  end
  def []=(x,y)
    @attributes_to_update[x] = y
    @pi[x] = y
  end
  def [](x)
    @pi[x]
  end

  def log_stderr(msg)
    $arv.log.create log: {
      event_type: 'stderr',
      object_uuid: self[:uuid],
      owner_uuid: self[:owner_uuid],
      properties: {"text" => msg},
    }
  end

  protected
  def initialize(j)
    @attributes_to_update = {}
    @pi = j
  end
end

class JobCache
  def self.get(uuid)
    @cache ||= {}
    result = $client.execute(:api_method => $arvados.jobs.get,
                             :parameters => {
                               :uuid => uuid
                             },
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    @cache[uuid] = JSON.parse result.body, :symbolize_names => true
  end
  def self.where(conditions)
    result = $client.execute(:api_method => $arvados.jobs.list,
                             :parameters => {
                               :limit => 10000,
                               :where => conditions.to_json
                             },
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    list = JSON.parse result.body, :symbolize_names => true
    if list and list[:items].is_a? Array
      list[:items]
    else
      []
    end
  end
  def self.create(pipeline, component, job, create_params)
    @cache ||= {}

    body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))

    result = $client.execute(:api_method => $arvados.jobs.create,
                             :body_object => body,
                             :authenticated => false,
                             :headers => {
                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                             })
    j = JSON.parse result.body, :symbolize_names => true
    if j.is_a? Hash and j[:uuid]
      @cache[j[:uuid]] = j
    else
      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0

      msg = ""
      j[:errors].each do |err|
        msg += "Error creating job for component #{component}: #{err}\n"
      end
      msg += "Job submission was: #{body.to_json}"

      pipeline.log_stderr(msg)
      nil
    end
  end

  protected

  def self.no_nil_values(hash)
    hash.reject { |key, value| value.nil? }
  end
end

class WhRunPipelineInstance
  attr_reader :instance

  def initialize(_options)
    @options = _options
  end

  def fetch_template(template)
    if template.match /[^-0-9a-z]/
      # Doesn't look like a uuid -- use it as a filename.
      @template = JSON.parse File.read(template), :symbolize_names => true
    else
      result = $client.execute(:api_method => $arvados.pipeline_templates.get,
                               :parameters => {
                                 :uuid => template
                               },
                               :authenticated => false,
                               :headers => {
                                 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                               })
      @template = JSON.parse result.body, :symbolize_names => true
      if !@template[:uuid]
        abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
      end
    end
    self
  end

  def fetch_instance(instance_uuid)
    @instance = PipelineInstance.find(instance_uuid)
    @template = @instance
    self
  end

  def apply_parameters(params_args)
    params_args.shift if params_args[0] == '--'
    params = {}
    while !params_args.empty?
      if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
        params[re[2]] = re[3]
        params_args.shift
      elsif params_args.size > 1
        param = params_args.shift.sub /^--/, ''
        params[param] = params_args.shift
      else
        abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
      end
    end

    if not @template[:components].is_a?(Hash)
      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
    end
    @components = @template[:components].dup

    bad_components = @components.each_pair.select do |cname, cspec|
      not cspec.is_a?(Hash)
    end
    if bad_components.any?
      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
    end

    bad_components = @components.each_pair.select do |cname, cspec|
      not cspec[:script_parameters].is_a?(Hash)
    end
    if bad_components.any?
      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
    end

    errors = []
    @components.each do |componentname, component|
      component[:script_parameters].each do |parametername, parameter|
        parameter = { :value => parameter } unless parameter.is_a? Hash
        value =
          (params["#{componentname}::#{parametername}"] ||
           parameter[:value] ||
           (parameter[:output_of].nil? &&
            (params[parametername.to_s] ||
             parameter[:default])) ||
           nil)
        if value.nil? and
            ![false,'false',0,'0'].index parameter[:required]
          if parameter[:output_of]
            if not @components[parameter[:output_of].intern]
              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
            end
            next
          end
          errors << [componentname, parametername, "required parameter is missing"]
        end
        debuglog "parameter #{componentname}::#{parametername} == #{value}"

        component[:script_parameters][parametername] =
          parameter.dup.merge(value: value)
      end
    end
    if !errors.empty?
      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
    end
    debuglog "options=" + @options.pretty_inspect
    self
  end

  def setup_instance
    if @instance
      @instance[:properties][:run_options] ||= {}
      if @options[:no_reuse]
        # override properties of existing instance
        @instance[:properties][:run_options][:enable_job_reuse] = false
      else
        # Default to "enable reuse" if not specified. (This code path
        # can go away when old clients go away.)
        if @instance[:properties][:run_options][:enable_job_reuse].nil?
          @instance[:properties][:run_options][:enable_job_reuse] = true
        end
      end
    else
      description = $options[:description]
      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
      @instance = PipelineInstance.
        create(components: @components,
               properties: {
                 run_options: {
                   enable_job_reuse: !@options[:no_reuse]
                 }
               },
               pipeline_template_uuid: @template[:uuid],
               description: description,
               state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
    end
    self
  end

  def run
    moretodo = true
    interrupted = false

    if @instance[:started_at].nil?
      @instance[:started_at] = Time.now
    end

    job_creation_failed = 0
    while moretodo
      moretodo = false
      @components.each do |cname, c|
        job = nil
        owner_uuid = @instance[:owner_uuid]
        # Is the job satisfying this component already known to be
        # finished? (Already meaning "before we query API server about
        # the job's current state")
        c_already_finished = (c[:job] &&
                              c[:job][:uuid] &&
                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
        if !c[:job] and
            c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
          # No job yet associated with this component and is component inputs
          # are fully specified (any output_of script_parameters are resolved
          # to real value)
          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
          job = JobCache.create(@instance, cname, {
            :script => c[:script],
            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
                                         [key, spec[:value]]
                                       end],
            :script_version => c[:script_version],
            :repository => c[:repository],
            :nondeterministic => c[:nondeterministic],
            :runtime_constraints => c[:runtime_constraints],
            :owner_uuid => owner_uuid,
            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
            :submit_id => my_submit_id,
            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
          }, {
            # This is the right place to put these attributes when
            # dealing with new API servers.
            :minimum_script_version => c[:minimum_script_version],
            :exclude_script_versions => c[:exclude_minimum_script_versions],
            :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
                                !c[:nondeterministic]),
            :filters => c[:filters]
          })
          if job
            debuglog "component #{cname} new job #{job[:uuid]}"
            c[:job] = job
            c[:run_in_process] = (@options[:run_jobs_here] and
                                  job[:submit_id] == my_submit_id)
          else
            debuglog "component #{cname} new job failed", 0
            job_creation_failed += 1
          end
        end

        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
          report_status
          begin
            require 'open3'
            Open3.popen3("arv-crunch-job", "--force-unlock",
                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
              stdin.close
              while true
                rready, wready, = IO.select([stdout, stderr], [])
                break if !rready[0]
                begin
                  buf = rready[0].read_nonblock(2**20)
                rescue EOFError
                  break
                end
                (rready[0] == stdout ? $stdout : $stderr).write(buf)
              end
              stdout.close
              stderr.close
              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
            end
            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
              raise Exception.new("arv-crunch-job did not set finished_at.")
            end
          rescue Exception => e
            debuglog "Interrupted (#{e}). Failing job.", 0
            $arv.job.update(uuid: c[:job][:uuid],
                            job: {
                              state: "Failed"
                            })
          end
        end

        if c[:job] and c[:job][:uuid]
          if ["Running", "Queued"].include?(c[:job][:state])
            # Job is running (or may be soon) so update copy of job record
            c[:job] = JobCache.get(c[:job][:uuid])
          end

          if c[:job][:state] == "Complete"
            # Populate script_parameters of other components waiting for
            # this job
            @components.each do |c2name, c2|
              c2[:script_parameters].each do |pname, p|
                if p.is_a? Hash and p[:output_of] == cname.to_s
                  debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
                  c2[:script_parameters][pname] = {value: c[:job][:output]}
                  moretodo = true
                end
              end
            end
            unless c_already_finished
              # This is my first time discovering that the job
              # succeeded. (At the top of this loop, I was still
              # waiting for it to finish.)

              if @instance[:name].andand.length.andand > 0
                pipeline_name = @instance[:name]
              elsif @template.andand[:name].andand.length.andand > 0
                pipeline_name = @template[:name]
              else
                pipeline_name = @instance[:uuid]
              end
              if c[:output_name] != false
                # Create a collection located in the same project as the pipeline with the contents of the output.
                portable_data_hash = c[:job][:output]
                collections = $arv.collection.list(limit: 1,
                                                   filters: [['portable_data_hash', '=', portable_data_hash]],
                                                   select: ["portable_data_hash", "manifest_text"]
                                                   )[:items]
                if collections.any?
                  name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"

                  # check if there is a name collision.
                  name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
                                                                   ["name", "=", name]])[:items]

                  newcollection_actual = nil
                  if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
                    # There is already a collection with the same name and the
                    # same contents, so just point to that.
                    newcollection_actual = name_collisions.first
                  end

                  if newcollection_actual.nil?
                    # Did not find a collection with the same name (or the
                    # collection has a different portable data hash) so create
                    # a new collection with ensure_unique_name: true.
                    newcollection = {
                      owner_uuid: owner_uuid,
                      name: name,
                      portable_data_hash: collections.first[:portable_data_hash],
                      manifest_text: collections.first[:manifest_text]
                    }
                    debuglog "Creating collection #{newcollection}", 0
                    newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
                  end

                  c[:output_uuid] = newcollection_actual[:uuid]
                else
                  debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
                end
              end
            end
          elsif ["Queued", "Running"].include? c[:job][:state]
            # Job is running or queued to run, so indicate that pipeline
            # should continue to run
            moretodo = true
          elsif c[:job][:state] == "Cancelled"
            debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
            moretodo = false
          elsif c[:job][:state] == "Failed"
            moretodo = false
          end
        end
      end
      @instance[:components] = @components
      report_status

      if @options[:no_wait]
        moretodo = false
      end

      # If job creation fails, just give up on this pipeline instance.
      if job_creation_failed > 0
        moretodo = false
      end

      if moretodo
        begin
          sleep 10
        rescue Interrupt
          debuglog "interrupt", 0
          interrupted = true
          break
        end
      end
    end

    c_in_state = @components.values.group_by { |c|
      c[:job] and c[:job][:state]
    }
    succeeded = c_in_state["Complete"].andand.count || 0
    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
    ended = succeeded + failed

    success = (succeeded == @components.length)

    # A job create call failed. Just give up.
    if job_creation_failed > 0
      debuglog "job creation failed - giving up on this pipeline instance", 0
      success = false
      failed += 1
    end

    if interrupted
     if success
        @instance[:state] = 'Complete'
     else
        @instance[:state] = 'Paused'
      end
    else
      if ended == @components.length or failed > 0
        @instance[:state] = success ? 'Complete' : 'Failed'
      end
    end

    if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
      @instance[:finished_at] = Time.now
    end

    debuglog "pipeline instance state is #{@instance[:state]}"

    # set components_summary
    components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
    @instance[:components_summary] = components_summary

    @instance.save
  end

  def cleanup
    if @instance and @instance[:state] == 'RunningOnClient'
      @instance[:state] = 'Paused'
      @instance.save
    end
  end

  def uuid
    @instance[:uuid]
  end

  protected

  def report_status
    @instance.save

    if @options[:status_json] != '/dev/null'
      File.open(@options[:status_json], 'w') do |f|
        f.puts @components.pretty_inspect
      end
    end

    if @options[:status_text] != '/dev/null'
      File.open(@options[:status_text], 'w') do |f|
        f.puts ""
        f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
        namewidth = @components.collect { |cname, c| cname.size }.max
        @components.each do |cname, c|
          jstatus = if !c[:job]
                      "-"
                    else case c[:job][:state]
                         when "Running"
                           "#{c[:job][:tasks_summary].inspect}"
                         when "Complete"
                           c[:job][:output]
                         when "Cancelled"
                           "cancelled #{c[:job][:cancelled_at]}"
                         when "Failed"
                           "failed #{c[:job][:finished_at]}"
                         when "Queued"
                           "queued #{c[:job][:created_at]}"
                         end
                    end
          f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
        end
      end
    end
  end

  def abort(msg)
    if @instance
      if ["New", "Ready", "RunningOnClient",
          "RunningOnServer"].include?(@instance[:state])
        @instance[:state] = "Failed"
        @instance[:finished_at] = Time.now
        @instance.save
      end
      @instance.log_stderr(msg)
    end
    Kernel::abort(msg)
  end
end

runner = WhRunPipelineInstance.new($options)
begin
  if $options[:template]
    runner.fetch_template($options[:template])
  else
    runner.fetch_instance($options[:instance])
  end
  runner.apply_parameters(p.leftovers)
  runner.setup_instance
  if $options[:submit]
    runner.instance.save
    puts runner.instance[:uuid]
  else
    runner.run
  end
rescue Exception => e
  runner.cleanup
  raise e
end