#!/usr/bin/env ruby # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 class WhRunPipelineInstance end if RUBY_VERSION < '1.9.3' then abort <<-EOS #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher. EOS end begin require 'arvados' require 'rubygems' require 'json' require 'pp' require 'trollop' require 'google/api_client' rescue LoadError => l $stderr.puts $: abort <<-EOS #{$0}: fatal: #{l.message} Some runtime dependencies may be missing. Try: gem install arvados pp google-api-client json trollop EOS end def debuglog(message, verbosity=1) $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity end # Parse command line options (the kind that control the behavior of # this program, that is, not the pipeline component parameters). p = Trollop::Parser.new do version __FILE__ banner(< :boolean, :short => :n) opt(:status_text, "Store plain text status in given file.", :short => :none, :type => :string, :default => '/dev/stdout') opt(:status_json, "Store json-formatted pipeline in given file.", :short => :none, :type => :string, :default => '/dev/null') opt(:no_wait, "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.", :short => :none, :type => :boolean) opt(:no_reuse, "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.", :short => :none, :type => :boolean) opt(:debug, "Print extra debugging information on stderr.", :type => :boolean) opt(:debug_level, "Set debug verbosity level.", :short => :none, :type => :integer) opt(:template, "UUID of pipeline template, or path to local pipeline template file.", :short => :none, :type => :string) opt(:instance, "UUID of pipeline instance.", :short => :none, :type => :string) opt(:submit, "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.", :short => :none, :type => :boolean) opt(:run_pipeline_here, "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).", :short => :none, :type => :boolean) opt(:run_jobs_here, "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.", :short => :none, :type => :boolean) opt(:run_here, "Synonym for --run-jobs-here.", :short => :none, :type => :boolean) opt(:description, "Description for the pipeline instance.", :short => :none, :type => :string) opt(:project_uuid, "UUID of the project for the pipeline instance.", short: :none, type: :string) stop_on [:'--'] end $options = Trollop::with_standard_exception_handling p do p.parse ARGV end $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0 $options[:run_jobs_here] ||= $options[:run_here] # old flag name $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A if $options[:instance] if $options[:template] or $options[:submit] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] $stderr.puts "error: you must supply a --template or --instance." p.educate abort end if $options[:run_pipeline_here] == $options[:submit] abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit." end # Set up the API client. $arv = Arvados.new api_version: 'v1' $client = $arv.client $arvados = $arv.arvados_api class PipelineInstance def self.find(uuid) result = $client.execute(:api_method => $arvados.pipeline_instances.get, :parameters => { :uuid => uuid }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0 nil else debuglog "Retrieved pipeline_instance #{j[:uuid]}" self.new(j) end end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body_object => { :pipeline_instance => attributes }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) end def save result = $client.execute(:api_method => $arvados.pipeline_instances.update, :parameters => { :uuid => @pi[:uuid] }, :body_object => { :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0 nil else @attributes_to_update = {} @pi = j end end def []=(x,y) @attributes_to_update[x] = y @pi[x] = y end def [](x) @pi[x] end def log_stderr(msg) $arv.log.create log: { event_type: 'stderr', object_uuid: self[:uuid], owner_uuid: self[:owner_uuid], properties: {"text" => msg}, } end protected def initialize(j) @attributes_to_update = {} @pi = j end end class JobCache def self.get(uuid) @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.get, :parameters => { :uuid => uuid }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) @cache[uuid] = JSON.parse result.body, :symbolize_names => true end def self.where(conditions) result = $client.execute(:api_method => $arvados.jobs.list, :parameters => { :limit => 10000, :where => conditions.to_json }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) list = JSON.parse result.body, :symbolize_names => true if list and list[:items].is_a? Array list[:items] else [] end end # create() returns [job, exception]. If both job and exception are # nil, there was a non-retryable error and the call should not be # attempted again. def self.create(pipeline, component, job, create_params) @cache ||= {} body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) result = nil begin result = $client.execute( :api_method => $arvados.jobs.create, :body_object => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) if result.status == 429 || result.status >= 500 raise Exception.new("HTTP status #{result.status}") end rescue Exception => e return nil, e end j = JSON.parse(result.body, :symbolize_names => true) rescue nil if result.status == 200 && j.is_a?(Hash) && j[:uuid] @cache[j[:uuid]] = j return j, nil else errors = j[:errors] rescue [] debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0 msg = "" errors.each do |err| msg += "Error creating job for component #{component}: #{err}\n" end msg += "Job submission was: #{body.to_json}" pipeline.log_stderr(msg) return nil, nil end end protected def self.no_nil_values(hash) hash.reject { |key, value| value.nil? } end end class WhRunPipelineInstance attr_reader :instance def initialize(_options) @options = _options end def fetch_template(template) if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { :uuid => template }, :authenticated => false, :headers => { authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] }) @template = JSON.parse result.body, :symbolize_names => true if !@template[:uuid] abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}" end end self end def fetch_instance(instance_uuid) @instance = PipelineInstance.find(instance_uuid) @template = @instance self end def apply_parameters(params_args) params_args.shift if params_args[0] == '--' params = {} while !params_args.empty? if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/) params[re[2]] = re[3] params_args.shift elsif params_args.size > 1 param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end if not @template[:components].is_a?(Hash) abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" end @components = @template[:components].dup bad_components = @components.each_pair.select do |cname, cspec| not cspec.is_a?(Hash) end if bad_components.any? abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" end bad_components = @components.each_pair.select do |cname, cspec| not cspec[:script_parameters].is_a?(Hash) end if bad_components.any? abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" end errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| parameter = { :value => parameter } unless parameter.is_a? Hash if params.has_key?("#{componentname}::#{parametername}") value = params["#{componentname}::#{parametername}"] elsif parameter.has_key?(:value) value = parameter[:value] elsif parameter.has_key?(:output_of) if !@components[parameter[:output_of].intern] errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"] else # value will be filled in later when the upstream # component's output becomes known end next elsif params.has_key?(parametername.to_s) value = params[parametername.to_s] elsif parameter.has_key?(:default) value = parameter[:default] elsif [false, 'false', 0, '0'].index(parameter[:required]) value = nil else errors << [componentname, parametername, "required parameter is missing"] next end debuglog "parameter #{componentname}::#{parametername} == #{value}" component[:script_parameters][parametername] = parameter.dup.merge(value: value) end end if !errors.empty? all_errors = errors.collect do |c,p,e| "#{c}::#{p} - #{e}\n" end.join("") abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance if @instance @instance[:properties][:run_options] ||= {} if @options[:no_reuse] # override properties of existing instance @instance[:properties][:run_options][:enable_job_reuse] = false else # Default to "enable reuse" if not specified. (This code path # can go away when old clients go away.) if @instance[:properties][:run_options][:enable_job_reuse].nil? @instance[:properties][:run_options][:enable_job_reuse] = true end end else description = $options[:description] || ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) instance_body = { components: @components, properties: { run_options: { enable_job_reuse: !@options[:no_reuse] } }, pipeline_template_uuid: @template[:uuid], description: description, state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient') } if @options[:project_uuid] instance_body[:owner_uuid] = @options[:project_uuid] end @instance = PipelineInstance.create(instance_body) end self end def run moretodo = true interrupted = false if @instance[:started_at].nil? @instance[:started_at] = Time.now end job_creation_failed = 0 while moretodo moretodo = false @components.each do |cname, c| job = nil owner_uuid = @instance[:owner_uuid] # Is the job satisfying this component already known to be # finished? (Already meaning "before we query API server about # the job's current state") c_already_finished = (c[:job] && c[:job][:uuid] && ["Complete", "Failed", "Cancelled"].include?(c[:job][:state])) if !c[:job] and c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty? # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}" job, err = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => Hash[c[:script_parameters].map do |key, spec| [key, spec[:value]] end], :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], :runtime_constraints => c[:runtime_constraints], :owner_uuid => owner_uuid, :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil), :submit_id => my_submit_id, :state => (if @options[:run_jobs_here] then "Running" else "Queued" end) }, { # This is the right place to put these attributes when # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && !c[:nondeterministic]), :filters => c[:filters] }) if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job c[:run_in_process] = (@options[:run_jobs_here] and job[:submit_id] == my_submit_id) elsif err.nil? debuglog "component #{cname} new job failed", 0 job_creation_failed += 1 else debuglog "component #{cname} new job failed, err=#{err}", 0 end end if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state] report_status begin require 'open3' Open3.popen3("arv-crunch-job", "--force-unlock", "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr| debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0 stdin.close while true rready, wready, = IO.select([stdout, stderr], []) break if !rready[0] begin buf = rready[0].read_nonblock(2**20) rescue EOFError break end (rready[0] == stdout ? $stdout : $stderr).write(buf) end stdout.close stderr.close debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0 end if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at] raise Exception.new("arv-crunch-job did not set finished_at.") end rescue Exception => e debuglog "Interrupted (#{e}). Failing job.", 0 $arv.job.update(uuid: c[:job][:uuid], job: { state: "Failed" }) end end if c[:job] and c[:job][:uuid] if ["Running", "Queued"].include?(c[:job][:state]) # Job is running (or may be soon) so update copy of job record c[:job] = JobCache.get(c[:job][:uuid]) end if c[:job][:state] == "Complete" # Populate script_parameters of other components waiting for # this job @components.each do |c2name, c2| c2[:script_parameters].each do |pname, p| if p.is_a? Hash and p[:output_of] == cname.to_s debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}" c2[:script_parameters][pname] = {value: c[:job][:output]} moretodo = true end end end unless c_already_finished # This is my first time discovering that the job # succeeded. (At the top of this loop, I was still # waiting for it to finish.) if @instance[:name].andand.length.andand > 0 pipeline_name = @instance[:name] elsif @template.andand[:name].andand.length.andand > 0 pipeline_name = @template[:name] else pipeline_name = @instance[:uuid] end if c[:output_name] != false # Create a collection located in the same project as the pipeline with the contents of the output. portable_data_hash = c[:job][:output] collections = $arv.collection.list(limit: 1, filters: [['portable_data_hash', '=', portable_data_hash]], select: ["portable_data_hash", "manifest_text"] )[:items] if collections.any? name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}" # check if there is a name collision. name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid], ["name", "=", name]])[:items] newcollection_actual = nil if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash # There is already a collection with the same name and the # same contents, so just point to that. newcollection_actual = name_collisions.first end if newcollection_actual.nil? # Did not find a collection with the same name (or the # collection has a different portable data hash) so create # a new collection with ensure_unique_name: true. newcollection = { owner_uuid: owner_uuid, name: name, portable_data_hash: collections.first[:portable_data_hash], manifest_text: collections.first[:manifest_text] } debuglog "Creating collection #{newcollection}", 0 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true end c[:output_uuid] = newcollection_actual[:uuid] else debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0 end end end elsif ["Queued", "Running"].include? c[:job][:state] # Job is running or queued to run, so indicate that pipeline # should continue to run moretodo = true elsif c[:job][:state] == "Cancelled" debuglog "component #{cname} job #{c[:job][:uuid]} cancelled." moretodo = false elsif c[:job][:state] == "Failed" moretodo = false end end end @instance[:components] = @components report_status if @options[:no_wait] moretodo = false end # If job creation fails, just give up on this pipeline instance. if job_creation_failed > 0 moretodo = false end if moretodo begin sleep 10 rescue Interrupt debuglog "interrupt", 0 interrupted = true break end end end c_in_state = @components.values.group_by { |c| c[:job] and c[:job][:state] } succeeded = c_in_state["Complete"].andand.count || 0 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0) ended = succeeded + failed success = (succeeded == @components.length) # A job create call failed. Just give up. if job_creation_failed > 0 debuglog "job creation failed - giving up on this pipeline instance", 0 success = false failed += 1 end if interrupted if success @instance[:state] = 'Complete' else @instance[:state] = 'Paused' end else if ended == @components.length or failed > 0 @instance[:state] = success ? 'Complete' : 'Failed' end end if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state] @instance[:finished_at] = Time.now end debuglog "pipeline instance state is #{@instance[:state]}" # set components_summary components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed} @instance[:components_summary] = components_summary @instance.save end def cleanup if @instance and @instance[:state] == 'RunningOnClient' @instance[:state] = 'Paused' @instance.save end end def uuid @instance[:uuid] end protected def report_status @instance.save if @options[:status_json] != '/dev/null' File.open(@options[:status_json], 'w') do |f| f.puts @components.pretty_inspect end end if @options[:status_text] != '/dev/null' File.open(@options[:status_text], 'w') do |f| f.puts "" f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}" namewidth = @components.collect { |cname, c| cname.size }.max @components.each do |cname, c| jstatus = if !c[:job] "-" else case c[:job][:state] when "Running" "#{c[:job][:tasks_summary].inspect}" when "Complete" c[:job][:output] when "Cancelled" "cancelled #{c[:job][:cancelled_at]}" when "Failed" "failed #{c[:job][:finished_at]}" when "Queued" "queued #{c[:job][:created_at]}" end end f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}" end end end end def abort(msg) if @instance if ["New", "Ready", "RunningOnClient", "RunningOnServer"].include?(@instance[:state]) @instance[:state] = "Failed" @instance[:finished_at] = Time.now @instance.save end @instance.log_stderr(msg) end Kernel::abort(msg) end end runner = WhRunPipelineInstance.new($options) begin if $options[:template] runner.fetch_template($options[:template]) else runner.fetch_instance($options[:instance]) end runner.apply_parameters(p.leftovers) runner.setup_instance if $options[:submit] runner.instance.save puts runner.instance[:uuid] else runner.run end rescue Exception => e runner.cleanup raise e end