#!/usr/bin/env ruby

# == Synopsis
#
#  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
#  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
#
# Satisfy a pipeline template by finding or submitting a mapreduce job
# for each pipeline component.
#
# == Options
#
# [--template uuid] Use the specified pipeline template.
#
# [--template path] Load the pipeline template from the specified
#                   local file.
#
# [--instance uuid] Use the specified pipeline instance.
#
# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
#                 to finish. Just find out whether jobs are finished,
#                 queued, or running for each component
#
# [--create-instance-only] Do not try to satisfy any components. Just
#                          create an instance, print its UUID to
#                          stdout, and exit.
#
# [--no-wait] Make only as much progress as possible without entering
#             a sleep/poll loop.
#
# [--no-reuse-finished] Do not reuse existing outputs to satisfy
#                       pipeline components. Always submit a new job
#                       or use an existing job which has not yet
#                       finished.
#
# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
#              components. Submit a new job for every component.
#
# [--debug] Print extra debugging information on stderr.
#
# [--debug-level N] Increase amount of debugging information. Default
#                   1, possible range 0..3.
#
# [--status-text path] Print plain text status report to a file or
#                      fifo. Default: /dev/stdout
#
# [--status-json path] Print JSON status report to a file or
#                      fifo. Default: /dev/null
#
# == Parameters
#
# [param_name=param_value]
#
# [param_name param_value] Set (or override) the default value for
#                          every parameter with the given name.
#
# [component_name::param_name=param_value]
# [component_name::param_name param_value]
# [--component_name::param_name=param_value]
# [--component_name::param_name param_value] Set the value of a
#                                            parameter for a single
#                                            component.
#
class WhRunPipelineInstance
end

$application_version = 1.0

if RUBY_VERSION < '1.9.3' then
  abort <<-EOS
#{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
  EOS
end

$arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
$arvados_api_host = ENV['ARVADOS_API_HOST'] or
  abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
$arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
  abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."

begin
  require 'rubygems'
  require 'google/api_client'
  require 'json'
  require 'pp'
  require 'trollop'
rescue LoadError
  abort <<-EOS
#{$0}: fatal: some runtime dependencies are missing.
Try: gem install pp google-api-client json trollop
  EOS
end

def debuglog(message, verbosity=1)
  $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
end

module Kernel
  def suppress_warnings
    original_verbosity = $VERBOSE
    $VERBOSE = nil
    result = yield
    $VERBOSE = original_verbosity
    return result
  end
end

if $arvados_api_host.match /local/
  # You probably don't care about SSL certificate checks if you're
  # testing with a dev server.
  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
end

class Google::APIClient
  def discovery_document(api, version)
    api = api.to_s
    return @discovery_documents["#{api}:#{version}"] ||=
      begin
        response = self.execute!(
                                 :http_method => :get,
                                 :uri => self.discovery_uri(api, version),
                                 :authenticated => false
                                 )
        response.body.class == String ? JSON.parse(response.body) : response.body
      end
  end
end


# Parse command line options (the kind that control the behavior of
# this program, that is, not the pipeline component parameters).

p = Trollop::Parser.new do
  opt(:dry_run,
      "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
      :type => :boolean,
      :short => :n)
  opt(:status_text,
      "Store plain text status in given file.",
      :short => :none,
      :type => :string,
      :default => '/dev/stdout')
  opt(:status_json,
      "Store json-formatted pipeline in given file.",
      :short => :none,
      :type => :string,
      :default => '/dev/null')
  opt(:no_wait,
      "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
      :short => :none,
      :type => :boolean)
  opt(:no_reuse_finished,
      "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
      :short => :none,
      :type => :boolean)
  opt(:no_reuse,
      "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
      :short => :none,
      :type => :boolean)
  opt(:debug,
      "Print extra debugging information on stderr.",
      :type => :boolean)
  opt(:debug_level,
      "Set debug verbosity level.",
      :short => :none,
      :type => :integer)
  opt(:template,
      "UUID of pipeline template, or path to local pipeline template file.",
      :short => :none,
      :type => :string)
  opt(:instance,
      "UUID of pipeline instance.",
      :short => :none,
      :type => :string)
  opt(:create_instance_only,
      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
      :short => :none,
      :type => :boolean)
  stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
  p.parse ARGV
end
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0

if $options[:instance]
  if $options[:template] or $options[:create_instance_only]
    abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
  end
elsif not $options[:template]
  abort "#{$0}: syntax error: you must supply a --template or --instance."
end

# Set up the API client.

$client ||= Google::APIClient.
  new(:host => $arvados_api_host,
      :application_name => File.split($0).last,
      :application_version => $application_version.to_s)
$arvados = $client.discovered_api('arvados', $arvados_api_version)


class PipelineInstance
  def self.find(uuid)
    result = $client.execute(:api_method => $arvados.pipeline_instances.get,
                             :parameters => {
                               :uuid => uuid
                             },
                             :body => {
                               :api_token => ENV['ARVADOS_API_TOKEN']
                             },
                             :authenticated => false)
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
      nil
    else
      debuglog "Retrieved pipeline_instance #{j[:uuid]}"
      self.new(j)
    end
  end
  def self.create(attributes)
    result = $client.execute(:api_method => $arvados.pipeline_instances.create,
                             :body => {
                               :api_token => ENV['ARVADOS_API_TOKEN'],
                               :pipeline_instance => attributes
                             },
                             :authenticated => false)
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
    end
    debuglog "Created pipeline instance: #{j[:uuid]}"
    self.new(j)
  end
  def save
    result = $client.execute(:api_method => $arvados.pipeline_instances.update,
                             :parameters => {
                               :uuid => @pi[:uuid]
                             },
                             :body => {
                               :api_token => ENV['ARVADOS_API_TOKEN'],
                               :pipeline_instance => @attributes_to_update.to_json
                             },
                             :authenticated => false)
    j = JSON.parse result.body, :symbolize_names => true
    unless j.is_a? Hash and j[:uuid]
      debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
      nil
    else
      @attributes_to_update = {}
      @pi = j
    end
  end
  def []=(x,y)
    @attributes_to_update[x] = y
    @pi[x] = y
  end
  def [](x)
    @pi[x]
  end
  protected
  def initialize(j)
    @attributes_to_update = {}
    @pi = j
  end
end

class JobCache
  def self.get(uuid)
    @cache ||= {}
    result = $client.execute(:api_method => $arvados.jobs.get,
                             :parameters => {
                               :api_token => ENV['ARVADOS_API_TOKEN'],
                               :uuid => uuid
                             },
                             :authenticated => false)
    @cache[uuid] = JSON.parse result.body, :symbolize_names => true
  end
  def self.where(conditions)
    result = $client.execute(:api_method => $arvados.jobs.list,
                             :parameters => {
                               :api_token => ENV['ARVADOS_API_TOKEN'],
                               :limit => 10000,
                               :where => conditions.to_json
                             },
                             :authenticated => false)
    list = JSON.parse result.body, :symbolize_names => true
    if list and list[:items].is_a? Array
      list[:items]
    else
      []
    end
  end
  def self.create(attributes)
    @cache ||= {}
    result = $client.execute(:api_method => $arvados.jobs.create,
                             :parameters => {
                               :api_token => ENV['ARVADOS_API_TOKEN'],
                               :job => attributes.to_json
                             },
                             :authenticated => false)
    j = JSON.parse result.body, :symbolize_names => true
    if j.is_a? Hash and j[:uuid]
      @cache[j[:uuid]] = j
    else
      debuglog "create job: #{j[:errors] rescue nil}", 0
      nil
    end
  end
end

class WhRunPipelineInstance
  attr_reader :instance

  def initialize(_options)
    @options = _options
  end

  def fetch_template(template)
    if template.match /[^-0-9a-z]/
      # Doesn't look like a uuid -- use it as a filename.
      @template = JSON.parse File.read(template), :symbolize_names => true
      if !@template[:components]
        abort ("#{$0}: Template loaded from #{template} " +
               "does not have a \"components\" key")
      end
    else
      result = $client.execute(:api_method => $arvados.pipeline_templates.get,
                               :parameters => {
                                 :api_token => ENV['ARVADOS_API_TOKEN'],
                                 :uuid => template
                               },
                               :authenticated => false)
      @template = JSON.parse result.body, :symbolize_names => true
      if !@template[:uuid]
        abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
      end
    end
    self
  end

  def fetch_instance(instance_uuid)
    @instance = PipelineInstance.find(instance_uuid)
    @template = @instance
    self
  end

  def apply_parameters(params_args)
    params_args.shift if params_args[0] == '--'
    params = {}
    while !params_args.empty?
      if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
        params[re[2]] = re[3]
        params_args.shift
      elsif params_args.size > 1
        param = params_args.shift.sub /^--/, ''
        params[param] = params_args.shift
      else
        abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
      end
    end

    @components = @template[:components].dup

    errors = []
    @components.each do |componentname, component|
      component[:script_parameters].each do |parametername, parameter|
        parameter = { :value => parameter } unless parameter.is_a? Hash
        value =
          (params["#{componentname}::#{parametername}"] ||
           parameter[:value] ||
           (parameter[:output_of].nil? &&
            (params[parametername.to_s] ||
             parameter[:default])) ||
           nil)
        if value.nil? and
            ![false,'false',0,'0'].index parameter[:required]
          if parameter[:output_of]
            next
          end
          errors << [componentname, parametername, "required parameter is missing"]
        end
        debuglog "parameter #{componentname}::#{parametername} == #{value}"
        component[:script_parameters][parametername] = value
      end
    end
    if !errors.empty?
      abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
    end
    debuglog "options=" + @options.pretty_inspect
    self
  end

  def setup_instance
    @instance ||= PipelineInstance.
      create(:components => @components,
             :pipeline_template_uuid => @template[:uuid],
             :active => true)
    self
  end

  def run
    moretodo = true
    while moretodo
      moretodo = false
      @components.each do |cname, c|
        job = nil
        if !c[:job] and
            c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
          # Job is fully specified (all parameter values are present) but
          # no particular job has been found.

          debuglog "component #{cname} ready to satisfy."

          c.delete :wait
          second_place_job = nil # satisfies component, but not finished yet

          (@options[:no_reuse] ? [] : JobCache.
           where(script: c[:script],
                 script_parameters: c[:script_parameters],
                 script_version_descends_from: c[:script_version_descends_from])
           ).each do |candidate_job|
            candidate_params_downcase = Hash[candidate_job[:script_parameters].
                                             map { |k,v| [k.downcase,v] }]
            c_params_downcase = Hash[c[:script_parameters].
                                     map { |k,v| [k.downcase,v] }]

            debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3

            unless candidate_params_downcase == c_params_downcase
              next
            end

            unless candidate_job[:success] || candidate_job[:running] ||
                (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
              debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
              next
            end

            if candidate_job[:success]
              unless @options[:no_reuse_finished]
                job = candidate_job
                debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
                c[:job] = job
              end
            else
              second_place_job ||= candidate_job
            end
            break
          end
          if not c[:job] and second_place_job
            job = second_place_job
            debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
            c[:job] = job
          end
          if not c[:job]
            debuglog "component #{cname} not satisfied by any existing job."
            if !@options[:dry_run]
              debuglog "component #{cname} new job."
              job = JobCache.create(:script => c[:script],
                                    :script_parameters => c[:script_parameters],
                                    :runtime_constraints => c[:runtime_constraints] || {},
                                    :script_version => c[:script_version] || 'master')
              if job
                debuglog "component #{cname} new job #{job[:uuid]}"
                c[:job] = job
              else
                debuglog "component #{cname} new job failed"
              end
            end
          end
        else
          c[:wait] = true
        end
        if c[:job] and c[:job][:uuid]
          if not c[:job][:finished_at] and not c[:job][:cancelled_at]
            c[:job] = JobCache.get(c[:job][:uuid])
          end
          if c[:job][:success]
            # Populate script_parameters of other components waiting for
            # this job
            @components.each do |c2name, c2|
              c2[:script_parameters].each do |pname, p|
                if p.is_a? Hash and p[:output_of] == cname.to_s
                  debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
                  c2[:script_parameters][pname] = c[:job][:output]
                end
              end
            end
          elsif c[:job][:running] ||
              (!c[:job][:started_at] && !c[:job][:cancelled_at])
            moretodo ||= !@options[:no_wait]
          elsif c[:job][:cancelled_at]
            debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
          end
        end
      end
      @instance[:components] = @components
      @instance[:active] = moretodo
      report_status
      if moretodo
        begin
          sleep 10
        rescue Interrupt
          debuglog "interrupt", 0
          abort
        end
      end
    end
    @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
    @instance.save
  end

  def cleanup
    if @instance
      @instance[:active] = false
      @instance.save
    end
  end

  def uuid
    @instance[:uuid]
  end

  protected

  def report_status
    @instance.save

    if @options[:status_json] != '/dev/null'
      File.open(@options[:status_json], 'w') do |f|
        f.puts @components.pretty_inspect
      end
    end

    if @options[:status_text] != '/dev/null'
      File.open(@options[:status_text], 'w') do |f|
        f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
        namewidth = @components.collect { |cname, c| cname.size }.max
        @components.each do |cname, c|
          jstatus = if !c[:job]
                      "-"
                    elsif c[:job][:running]
                      "#{c[:job][:tasks_summary].inspect}"
                    elsif c[:job][:success]
                      c[:job][:output]
                    elsif c[:job][:cancelled_at]
                      "cancelled #{c[:job][:cancelled_at]}"
                    elsif c[:job][:finished_at]
                      "failed #{c[:job][:finished_at]}"
                    elsif c[:job][:started_at]
                      "started #{c[:job][:started_at]}"
                    else
                      "queued #{c[:job][:created_at]}"
                    end
          f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
        end
      end
    end
  end
end

runner = WhRunPipelineInstance.new($options)
begin
  if $options[:template]
    runner.fetch_template($options[:template])
  else
    runner.fetch_instance($options[:instance])
  end
  runner.apply_parameters(p.leftovers)
  runner.setup_instance
  if $options[:create_instance_only]
    runner.instance.save
    puts runner.instance[:uuid]
  else
    runner.run
  end
rescue Exception => e
  runner.cleanup
  raise e
end