#!/usr/bin/env ruby # == Synopsis # # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters] # arv-run-pipeline-instance --instance pipeline-instance-uuid [options] # # Satisfy a pipeline template by finding or submitting a mapreduce job # for each pipeline component. # # == Options # # [--template uuid] Use the specified pipeline template. # # [--template path] Load the pipeline template from the specified # local file. # # [--instance uuid] Use the specified pipeline instance. # # [-n, --dry-run] Do not start any new jobs or wait for existing jobs # to finish. Just find out whether jobs are finished, # queued, or running for each component # # [--submit] Do not try to satisfy any components. Just # create an instance, print its UUID to # stdout, and exit. # # [--no-wait] Make only as much progress as possible without entering # a sleep/poll loop. # # [--no-reuse] Do not reuse existing jobs to satisfy pipeline # components. Submit a new job for every component. # # [--debug] Print extra debugging information on stderr. # # [--debug-level N] Increase amount of debugging information. Default # 1, possible range 0..3. # # [--status-text path] Print plain text status report to a file or # fifo. Default: /dev/stdout # # [--status-json path] Print JSON status report to a file or # fifo. Default: /dev/null # # [--description] Description for the pipeline instance. # # == Parameters # # [param_name=param_value] # # [param_name param_value] Set (or override) the default value for # every parameter with the given name. # # [component_name::param_name=param_value] # [component_name::param_name param_value] # [--component_name::param_name=param_value] # [--component_name::param_name param_value] Set the value of a # parameter for a single # component. # class WhRunPipelineInstance end if RUBY_VERSION < '1.9.3' then abort <<-EOS #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher. EOS end $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1' $arvados_api_host = ENV['ARVADOS_API_HOST'] or abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set." $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set." begin require 'arvados' require 'rubygems' require 'json' require 'pp' require 'trollop' require 'google/api_client' rescue LoadError => l puts $: abort <<-EOS #{$0}: fatal: #{l.message} Some runtime dependencies may be missing. Try: gem install arvados pp google-api-client json trollop EOS end def debuglog(message, verbosity=1) $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity end module Kernel def suppress_warnings original_verbosity = $VERBOSE $VERBOSE = nil result = yield $VERBOSE = original_verbosity return result end end if $arvados_api_host.match /local/ # You probably don't care about SSL certificate checks if you're # testing with a dev server. suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE } end # Parse command line options (the kind that control the behavior of # this program, that is, not the pipeline component parameters). p = Trollop::Parser.new do version __FILE__ opt(:dry_run, "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.", :type => :boolean, :short => :n) opt(:status_text, "Store plain text status in given file.", :short => :none, :type => :string, :default => '/dev/stdout') opt(:status_json, "Store json-formatted pipeline in given file.", :short => :none, :type => :string, :default => '/dev/null') opt(:no_wait, "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.", :short => :none, :type => :boolean) opt(:no_reuse, "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.", :short => :none, :type => :boolean) opt(:debug, "Print extra debugging information on stderr.", :type => :boolean) opt(:debug_level, "Set debug verbosity level.", :short => :none, :type => :integer) opt(:template, "UUID of pipeline template, or path to local pipeline template file.", :short => :none, :type => :string) opt(:instance, "UUID of pipeline instance.", :short => :none, :type => :string) opt(:submit, "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.", :short => :none, :type => :boolean) opt(:run_pipeline_here, "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).", :short => :none, :type => :boolean) opt(:run_jobs_here, "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.", :short => :none, :type => :boolean) opt(:run_here, "Synonym for --run-jobs-here.", :short => :none, :type => :boolean) opt(:description, "Description for the pipeline instance.", :short => :none, :type => :string) stop_on [:'--'] end $options = Trollop::with_standard_exception_handling p do p.parse ARGV end $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0 $options[:run_jobs_here] ||= $options[:run_here] # old flag name $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A if $options[:instance] if $options[:template] or $options[:submit] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] puts "error: you must supply a --template or --instance." p.educate abort end if $options[:run_pipeline_here] == $options[:submit] abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit." end # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE module Kernel def suppress_warnings original_verbosity = $VERBOSE $VERBOSE = nil result = yield $VERBOSE = original_verbosity return result end end if ENV['ARVADOS_API_HOST_INSECURE'] suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE } end # Set up the API client. $arv = Arvados.new api_version: 'v1' $client = $arv.client $arvados = $arv.arvados_api class PipelineInstance def self.find(uuid) result = $client.execute(:api_method => $arvados.pipeline_instances.get, :parameters => { :uuid => uuid }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0 nil else debuglog "Retrieved pipeline_instance #{j[:uuid]}" self.new(j) end end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body_object => { :pipeline_instance => attributes }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) end def save result = $client.execute(:api_method => $arvados.pipeline_instances.update, :parameters => { :uuid => @pi[:uuid] }, :body_object => { :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0 nil else @attributes_to_update = {} @pi = j end end def []=(x,y) @attributes_to_update[x] = y @pi[x] = y end def [](x) @pi[x] end def log_stderr(msg) $arv.log.create log: { event_type: 'stderr', object_uuid: self[:uuid], owner_uuid: self[:owner_uuid], properties: {"text" => msg}, } end protected def initialize(j) @attributes_to_update = {} @pi = j end end class JobCache def self.get(uuid) @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.get, :parameters => { :uuid => uuid }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) @cache[uuid] = JSON.parse result.body, :symbolize_names => true end def self.where(conditions) result = $client.execute(:api_method => $arvados.jobs.list, :parameters => { :limit => 10000, :where => conditions.to_json }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) list = JSON.parse result.body, :symbolize_names => true if list and list[:items].is_a? Array list[:items] else [] end end def self.create(pipeline, component, job, create_params) @cache ||= {} body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) result = $client.execute(:api_method => $arvados.jobs.create, :body_object => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0 msg = "" j[:errors].each do |err| msg += "Error creating job for component #{component}: #{err}\n" end msg += "Job submission was: #{body.to_json}" pipeline.log_stderr(msg) nil end end protected def self.no_nil_values(hash) hash.reject { |key, value| value.nil? } end end class WhRunPipelineInstance attr_reader :instance def initialize(_options) @options = _options end def fetch_template(template) if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { :uuid => template }, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) @template = JSON.parse result.body, :symbolize_names => true if !@template[:uuid] abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}" end end self end def fetch_instance(instance_uuid) @instance = PipelineInstance.find(instance_uuid) @template = @instance self end def apply_parameters(params_args) params_args.shift if params_args[0] == '--' params = {} while !params_args.empty? if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/) params[re[2]] = re[3] params_args.shift elsif params_args.size > 1 param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end if not @template[:components].is_a?(Hash) abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" end @components = @template[:components].dup bad_components = @components.each_pair.select do |cname, cspec| not cspec.is_a?(Hash) end if bad_components.any? abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" end bad_components = @components.each_pair.select do |cname, cspec| not cspec[:script_parameters].is_a?(Hash) end if bad_components.any? abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" end errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| parameter = { :value => parameter } unless parameter.is_a? Hash value = (params["#{componentname}::#{parametername}"] || parameter[:value] || (parameter[:output_of].nil? && (params[parametername.to_s] || parameter[:default])) || nil) if value.nil? and ![false,'false',0,'0'].index parameter[:required] if parameter[:output_of] if not @components[parameter[:output_of]] errors << [componentname, parametername, "output_of refers to nonexistent component"] else next end end errors << [componentname, parametername, "required parameter is missing"] end debuglog "parameter #{componentname}::#{parametername} == #{value}" component[:script_parameters][parametername] = parameter.dup.merge(value: value) end end if !errors.empty? abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance if @instance @instance[:properties][:run_options] ||= {} if @options[:no_reuse] # override properties of existing instance @instance[:properties][:run_options][:enable_job_reuse] = false else # Default to "enable reuse" if not specified. (This code path # can go away when old clients go away.) if @instance[:properties][:run_options][:enable_job_reuse].nil? @instance[:properties][:run_options][:enable_job_reuse] = true end end else description = $options[:description] description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description @instance = PipelineInstance. create(components: @components, properties: { run_options: { enable_job_reuse: !@options[:no_reuse] } }, pipeline_template_uuid: @template[:uuid], description: description, state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end def run moretodo = true interrupted = false if @instance[:started_at].nil? @instance[:started_at] = Time.now end job_creation_failed = 0 while moretodo moretodo = false @components.each do |cname, c| job = nil owner_uuid = @instance[:owner_uuid] # Is the job satisfying this component already known to be # finished? (Already meaning "before we query API server about # the job's current state") c_already_finished = (c[:job] && c[:job][:uuid] && ["Complete", "Failed", "Cancelled"].include?(c[:job][:state])) if !c[:job] and c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty? # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}" job = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => Hash[c[:script_parameters].map do |key, spec| [key, spec[:value]] end], :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], :runtime_constraints => c[:runtime_constraints], :owner_uuid => owner_uuid, :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil), :submit_id => my_submit_id, :state => (if @options[:run_jobs_here] then "Running" else "Queued" end) }, { # This is the right place to put these attributes when # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && !c[:nondeterministic]), :filters => c[:filters] }) if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job c[:run_in_process] = (@options[:run_jobs_here] and job[:submit_id] == my_submit_id) else debuglog "component #{cname} new job failed", 0 job_creation_failed += 1 end end if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state] report_status begin require 'open3' Open3.popen3("arv-crunch-job", "--force-unlock", "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr| debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0 stdin.close while true rready, wready, = IO.select([stdout, stderr], []) break if !rready[0] begin buf = rready[0].read_nonblock(2**20) rescue EOFError break end (rready[0] == stdout ? $stdout : $stderr).write(buf) end stdout.close stderr.close debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0 end if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at] raise Exception.new("arv-crunch-job did not set finished_at.") end rescue Exception => e debuglog "Interrupted (#{e}). Failing job.", 0 $arv.job.update(uuid: c[:job][:uuid], job: { state: "Failed" }) end end if c[:job] and c[:job][:uuid] if ["Running", "Queued"].include?(c[:job][:state]) # Job is running (or may be soon) so update copy of job record c[:job] = JobCache.get(c[:job][:uuid]) end if c[:job][:state] == "Complete" # Populate script_parameters of other components waiting for # this job @components.each do |c2name, c2| c2[:script_parameters].each do |pname, p| if p.is_a? Hash and p[:output_of] == cname.to_s debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}" c2[:script_parameters][pname] = {value: c[:job][:output]} moretodo = true end end end unless c_already_finished # This is my first time discovering that the job # succeeded. (At the top of this loop, I was still # waiting for it to finish.) if @instance[:name].andand.length.andand > 0 pipeline_name = @instance[:name] elsif @template.andand[:name].andand.length.andand > 0 pipeline_name = @template[:name] else pipeline_name = @instance[:uuid] end if c[:output_name] != false # Create a collection located in the same project as the pipeline with the contents of the output. portable_data_hash = c[:job][:output] collections = $arv.collection.list(limit: 1, filters: [['portable_data_hash', '=', portable_data_hash]], select: ["portable_data_hash", "manifest_text"] )[:items] if collections.any? name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}" # check if there is a name collision. name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid], ["name", "=", name]])[:items] newcollection_actual = nil if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash # There is already a collection with the same name and the # same contents, so just point to that. newcollection_actual = name_collisions.first end if newcollection_actual.nil? # Did not find a collection with the same name (or the # collection has a different portable data hash) so create # a new collection with ensure_unique_name: true. newcollection = { owner_uuid: owner_uuid, name: name, portable_data_hash: collections.first[:portable_data_hash], manifest_text: collections.first[:manifest_text] } debuglog "Creating collection #{newcollection}", 0 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true end c[:output_uuid] = newcollection_actual[:uuid] else debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0 end end end elsif ["Queued", "Running"].include? c[:job][:state] # Job is running or queued to run, so indicate that pipeline # should continue to run moretodo = true elsif c[:job][:state] == "Cancelled" debuglog "component #{cname} job #{c[:job][:uuid]} cancelled." moretodo = false elsif c[:job][:state] == "Failed" moretodo = false end end end @instance[:components] = @components report_status if @options[:no_wait] moretodo = false end # If job creation fails, just give up on this pipeline instance. if job_creation_failed > 0 moretodo = false end if moretodo begin sleep 10 rescue Interrupt debuglog "interrupt", 0 interrupted = true break end end end c_in_state = @components.values.group_by { |c| c[:job] and c[:job][:state] } succeeded = c_in_state["Complete"].andand.count || 0 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0) ended = succeeded + failed success = (succeeded == @components.length) # A job create call failed. Just give up. if job_creation_failed > 0 debuglog "job creation failed - giving up on this pipeline instance", 0 success = false failed += 1 end if interrupted if success @instance[:state] = 'Complete' else @instance[:state] = 'Paused' end else if ended == @components.length or failed > 0 @instance[:state] = success ? 'Complete' : 'Failed' end end if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state] @instance[:finished_at] = Time.now end debuglog "pipeline instance state is #{@instance[:state]}" # set components_summary components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed} @instance[:components_summary] = components_summary @instance.save end def cleanup if @instance and @instance[:state] == 'RunningOnClient' @instance[:state] = 'Paused' @instance.save end end def uuid @instance[:uuid] end protected def report_status @instance.save if @options[:status_json] != '/dev/null' File.open(@options[:status_json], 'w') do |f| f.puts @components.pretty_inspect end end if @options[:status_text] != '/dev/null' File.open(@options[:status_text], 'w') do |f| f.puts "" f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}" namewidth = @components.collect { |cname, c| cname.size }.max @components.each do |cname, c| jstatus = if !c[:job] "-" else case c[:job][:state] when "Running" "#{c[:job][:tasks_summary].inspect}" when "Complete" c[:job][:output] when "Cancelled" "cancelled #{c[:job][:cancelled_at]}" when "Failed" "failed #{c[:job][:finished_at]}" when "Queued" "queued #{c[:job][:created_at]}" end end f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}" end end end end def abort(msg) if @instance if ["New", "Ready", "RunningOnClient", "RunningOnServer"].include?(@instance[:state]) @instance[:state] = "Failed" @instance[:finished_at] = Time.now @instance.save end @instance.log_stderr(msg) end Kernel::abort(msg) end end runner = WhRunPipelineInstance.new($options) begin if $options[:template] runner.fetch_template($options[:template]) else runner.fetch_instance($options[:instance]) end runner.apply_parameters(p.leftovers) runner.setup_instance if $options[:submit] runner.instance.save puts runner.instance[:uuid] else runner.run end rescue Exception => e runner.cleanup raise e end