#!/usr/bin/env ruby # == Synopsis # # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters] # arv-run-pipeline-instance --instance pipeline-instance-uuid [options] # # Satisfy a pipeline template by finding or submitting a mapreduce job # for each pipeline component. # # == Options # # [--template uuid] Use the specified pipeline template. # # [--template path] Load the pipeline template from the specified # local file. # # [--instance uuid] Use the specified pipeline instance. # # [-n, --dry-run] Do not start any new jobs or wait for existing jobs # to finish. Just find out whether jobs are finished, # queued, or running for each component # # [--submit] Do not try to satisfy any components. Just # create an instance, print its UUID to # stdout, and exit. # # [--no-wait] Make only as much progress as possible without entering # a sleep/poll loop. # # [--no-reuse] Do not reuse existing jobs to satisfy pipeline # components. Submit a new job for every component. # # [--debug] Print extra debugging information on stderr. # # [--debug-level N] Increase amount of debugging information. Default # 1, possible range 0..3. # # [--status-text path] Print plain text status report to a file or # fifo. Default: /dev/stdout # # [--status-json path] Print JSON status report to a file or # fifo. Default: /dev/null # # == Parameters # # [param_name=param_value] # # [param_name param_value] Set (or override) the default value for # every parameter with the given name. # # [component_name::param_name=param_value] # [component_name::param_name param_value] # [--component_name::param_name=param_value] # [--component_name::param_name param_value] Set the value of a # parameter for a single # component. # class WhRunPipelineInstance end $application_version = 1.0 if RUBY_VERSION < '1.9.3' then abort <<-EOS #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher. EOS end $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1' $arvados_api_host = ENV['ARVADOS_API_HOST'] or abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set." $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set." begin require 'arvados' require 'rubygems' require 'json' require 'pp' require 'trollop' require 'google/api_client' rescue LoadError => l puts $: abort <<-EOS #{$0}: fatal: #{l.message} Some runtime dependencies may be missing. Try: gem install arvados pp google-api-client json trollop EOS end def debuglog(message, verbosity=1) $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity end module Kernel def suppress_warnings original_verbosity = $VERBOSE $VERBOSE = nil result = yield $VERBOSE = original_verbosity return result end end if $arvados_api_host.match /local/ # You probably don't care about SSL certificate checks if you're # testing with a dev server. suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE } end class Google::APIClient def discovery_document(api, version) api = api.to_s return @discovery_documents["#{api}:#{version}"] ||= begin response = self.execute!( :http_method => :get, :uri => self.discovery_uri(api, version), :authenticated => false ) response.body.class == String ? JSON.parse(response.body) : response.body end end end # Parse command line options (the kind that control the behavior of # this program, that is, not the pipeline component parameters). p = Trollop::Parser.new do version __FILE__ opt(:dry_run, "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.", :type => :boolean, :short => :n) opt(:status_text, "Store plain text status in given file.", :short => :none, :type => :string, :default => '/dev/stdout') opt(:status_json, "Store json-formatted pipeline in given file.", :short => :none, :type => :string, :default => '/dev/null') opt(:no_wait, "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.", :short => :none, :type => :boolean) opt(:no_reuse, "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.", :short => :none, :type => :boolean) opt(:debug, "Print extra debugging information on stderr.", :type => :boolean) opt(:debug_level, "Set debug verbosity level.", :short => :none, :type => :integer) opt(:template, "UUID of pipeline template, or path to local pipeline template file.", :short => :none, :type => :string) opt(:instance, "UUID of pipeline instance.", :short => :none, :type => :string) opt(:submit, "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.", :short => :none, :type => :boolean) opt(:run_here, "Manage the pipeline in process.", :short => :none, :type => :boolean) stop_on [:'--'] end $options = Trollop::with_standard_exception_handling p do p.parse ARGV end $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0 if $options[:instance] if $options[:template] or $options[:submit] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] abort "#{$0}: syntax error: you must supply a --template or --instance." end if $options[:run_here] == $options[:submit] abort "#{$0}: syntax error: you must supply either --run-here or --submit." end # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE module Kernel def suppress_warnings original_verbosity = $VERBOSE $VERBOSE = nil result = yield $VERBOSE = original_verbosity return result end end if ENV['ARVADOS_API_HOST_INSECURE'] suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE } end # Set up the API client. $client ||= Google::APIClient. new(:host => $arvados_api_host, :application_name => File.split($0).last, :application_version => $application_version.to_s) $arvados = $client.discovered_api('arvados', $arvados_api_version) $arv = Arvados.new api_version: 'v1' class PipelineInstance def self.find(uuid) result = $client.execute(:api_method => $arvados.pipeline_instances.get, :parameters => { :uuid => uuid }, :body => { :api_token => ENV['ARVADOS_API_TOKEN'] }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0 nil else debuglog "Retrieved pipeline_instance #{j[:uuid]}" self.new(j) end end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body => { :api_token => ENV['ARVADOS_API_TOKEN'], :pipeline_instance => attributes }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) end def save result = $client.execute(:api_method => $arvados.pipeline_instances.update, :parameters => { :uuid => @pi[:uuid] }, :body => { :api_token => ENV['ARVADOS_API_TOKEN'], :pipeline_instance => @attributes_to_update.to_json }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0 nil else @attributes_to_update = {} @pi = j end end def []=(x,y) @attributes_to_update[x] = y @pi[x] = y end def [](x) @pi[x] end protected def initialize(j) @attributes_to_update = {} @pi = j end end class JobCache def self.get(uuid) @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.get, :parameters => { :api_token => ENV['ARVADOS_API_TOKEN'], :uuid => uuid }, :authenticated => false) @cache[uuid] = JSON.parse result.body, :symbolize_names => true end def self.where(conditions) result = $client.execute(:api_method => $arvados.jobs.list, :parameters => { :api_token => ENV['ARVADOS_API_TOKEN'], :limit => 10000, :where => conditions.to_json }, :authenticated => false) list = JSON.parse result.body, :symbolize_names => true if list and list[:items].is_a? Array list[:items] else [] end end def self.create(attributes) @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.create, :parameters => { :api_token => ENV['ARVADOS_API_TOKEN'], :job => attributes.to_json }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0 nil end end end class WhRunPipelineInstance attr_reader :instance def initialize(_options) @options = _options end def fetch_template(template) if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true if !@template[:components] abort ("#{$0}: Template loaded from #{template} " + "does not have a \"components\" key") end else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { :api_token => ENV['ARVADOS_API_TOKEN'], :uuid => template }, :authenticated => false) @template = JSON.parse result.body, :symbolize_names => true if !@template[:uuid] abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}" end end self end def fetch_instance(instance_uuid) @instance = PipelineInstance.find(instance_uuid) @template = @instance self end def apply_parameters(params_args) params_args.shift if params_args[0] == '--' params = {} while !params_args.empty? if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/) params[re[2]] = re[3] params_args.shift elsif params_args.size > 1 param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end @components = @template[:components].dup errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| parameter = { :value => parameter } unless parameter.is_a? Hash value = (params["#{componentname}::#{parametername}"] || parameter[:value] || (parameter[:output_of].nil? && (params[parametername.to_s] || parameter[:default])) || nil) if value.nil? and ![false,'false',0,'0'].index parameter[:required] if parameter[:output_of] next end errors << [componentname, parametername, "required parameter is missing"] end debuglog "parameter #{componentname}::#{parametername} == #{value}" component[:script_parameters][parametername] = value end end if !errors.empty? abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance @instance ||= PipelineInstance. create(:components => @components, :pipeline_template_uuid => @template[:uuid], :active => true) self end def run moretodo = true while moretodo moretodo = false @components.each do |cname, c| job = nil c_already_finished = (c[:job] && c[:job][:uuid] && !c[:job][:success].nil?) if !c[:job] and c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty? # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) job = JobCache.create({:script => c[:script], :script_parameters => c[:script_parameters], :script_version => c[:script_version], :repository => c[:repository], :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], :nondeterministic => c[:nondeterministic], :no_reuse => @options[:no_reuse], :output_is_persistent => c[:output_is_persistent] || false}) if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job else debuglog "component #{cname} new job failed" end end if c[:job] and c[:job][:uuid] if (c[:job][:running] or not (c[:job][:finished_at] or c[:job][:cancelled_at])) # Job is running so update copy of job record c[:job] = JobCache.get(c[:job][:uuid]) end if c[:job][:success] # Populate script_parameters of other components waiting for # this job @components.each do |c2name, c2| c2[:script_parameters].each do |pname, p| if p.is_a? Hash and p[:output_of] == cname.to_s debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}" c2[:script_parameters][pname] = c[:job][:output] moretodo = true end end end unless c_already_finished if c[:output_is_persistent] # This is my first time discovering that the job # succeeded. I need to make sure a resources/wants # link is in place to protect the output from garbage # collection. (Normally Crunch does this for me, but # here I might be reusing the output of someone else's # job and I need to make sure it's understood that the # output is valuable to me, too.) wanted = c[:job][:output] debuglog "checking for existing persistence link for #{wanted}" @my_user_uuid ||= $arv.user.current[:uuid] links = $arv.link.list(limit: 1, filters: [%w(link_class = resources), %w(name = wants), %w(tail_uuid =) + [@my_user_uuid], %w(head_uuid =) + [wanted] ])[:items] if links.any? debuglog "link already exists, uuid #{links.first[:uuid]}" else newlink = $arv.link.create link: \ { link_class: 'resources', name: 'wants', tail_kind: 'arvados#user', tail_uuid: @my_user_uuid, head_kind: 'arvados#collection', head_uuid: wanted } debuglog "added link, uuid #{newlink[:uuid]}" end end end elsif c[:job][:running] || (!c[:job][:started_at] && !c[:job][:cancelled_at]) # Job is still running moretodo = true elsif c[:job][:cancelled_at] debuglog "component #{cname} job #{c[:job][:uuid]} cancelled." end end end @instance[:components] = @components @instance[:active] = moretodo report_status if @options[:no_wait] moretodo = false end if moretodo begin sleep 10 rescue Interrupt debuglog "interrupt", 0 abort end end end ended = 0 succeeded = 0 failed = 0 @components.each do |cname, c| if c[:job] if c[:job][:finished_at] ended += 1 if c[:job][:success] == true succeeded += 1 elsif c[:job][:success] == false failed += 1 end end end end if ended == @components.length or failed > 0 @instance[:active] = false @instance[:success] = (succeeded == @components.length) end @instance.save end def cleanup if @instance @instance[:active] = false @instance.save end end def uuid @instance[:uuid] end protected def report_status @instance.save if @options[:status_json] != '/dev/null' File.open(@options[:status_json], 'w') do |f| f.puts @components.pretty_inspect end end if @options[:status_text] != '/dev/null' File.open(@options[:status_text], 'w') do |f| f.puts "" f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}" namewidth = @components.collect { |cname, c| cname.size }.max @components.each do |cname, c| jstatus = if !c[:job] "-" elsif c[:job][:running] "#{c[:job][:tasks_summary].inspect}" elsif c[:job][:success] c[:job][:output] elsif c[:job][:cancelled_at] "cancelled #{c[:job][:cancelled_at]}" elsif c[:job][:finished_at] "failed #{c[:job][:finished_at]}" elsif c[:job][:started_at] "started #{c[:job][:started_at]}" else "queued #{c[:job][:created_at]}" end f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}" end end end end end runner = WhRunPipelineInstance.new($options) begin if $options[:template] runner.fetch_template($options[:template]) else runner.fetch_instance($options[:instance]) end runner.apply_parameters(p.leftovers) runner.setup_instance if $options[:submit] runner.instance.save puts runner.instance[:uuid] else runner.run end rescue Exception => e runner.cleanup raise e end