X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/37c7faef1b066a2dfe19030e0ae2b7cdd7d0be93..6d59f29ba37c608e4b01b27a2fa78bc065fdc2a6:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index 0842225cb8..d2b1109e16 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -12,18 +12,30 @@ # # [--template uuid] Use the specified pipeline template. # +# [--template path] Load the pipeline template from the specified +# local file. +# # [--instance uuid] Use the specified pipeline instance. # # [-n, --dry-run] Do not start any new jobs or wait for existing jobs # to finish. Just find out whether jobs are finished, # queued, or running for each component # -# [--create-only] Do not try to satisfy any components. Just create an -# instance, print its UUID to stdout, and exit. +# [--create-instance-only] Do not try to satisfy any components. Just +# create an instance, print its UUID to +# stdout, and exit. # # [--no-wait] Make only as much progress as possible without entering # a sleep/poll loop. # +# [--no-reuse-finished] Do not reuse existing outputs to satisfy +# pipeline components. Always submit a new job +# or use an existing job which has not yet +# finished. +# +# [--no-reuse] Do not reuse existing jobs to satisfy pipeline +# components. Submit a new job for every component. +# # [--debug] Print extra debugging information on stderr. # # [--debug-level N] Increase amount of debugging information. Default @@ -119,6 +131,7 @@ end # this program, that is, not the pipeline component parameters). p = Trollop::Parser.new do + version __FILE__ opt(:dry_run, "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.", :type => :boolean, @@ -137,6 +150,14 @@ p = Trollop::Parser.new do "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.", :short => :none, :type => :boolean) + opt(:no_reuse_finished, + "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.", + :short => :none, + :type => :boolean) + opt(:no_reuse, + "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.", + :short => :none, + :type => :boolean) opt(:debug, "Print extra debugging information on stderr.", :type => :boolean) @@ -145,14 +166,14 @@ p = Trollop::Parser.new do :short => :none, :type => :integer) opt(:template, - "UUID of pipeline template.", + "UUID of pipeline template, or path to local pipeline template file.", :short => :none, :type => :string) opt(:instance, "UUID of pipeline instance.", :short => :none, :type => :string) - opt(:create_only, + opt(:create_instance_only, "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.", :short => :none, :type => :boolean) @@ -164,8 +185,8 @@ end $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0 if $options[:instance] - if $options[:template] or $options[:create_only] - abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only." + if $options[:template] or $options[:create_instance_only] + abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only." end elsif not $options[:template] abort "#{$0}: syntax error: you must supply a --template or --instance." @@ -184,9 +205,11 @@ class PipelineInstance def self.find(uuid) result = $client.execute(:api_method => $arvados.pipeline_instances.get, :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], :uuid => uuid }, + :body => { + :api_token => ENV['ARVADOS_API_TOKEN'] + }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] @@ -199,9 +222,9 @@ class PipelineInstance end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, - :parameters => { + :body => { :api_token => ENV['ARVADOS_API_TOKEN'], - :pipeline_instance => attributes.to_json + :pipeline_instance => attributes }, :authenticated => false) j = JSON.parse result.body, :symbolize_names => true @@ -214,8 +237,10 @@ class PipelineInstance def save result = $client.execute(:api_method => $arvados.pipeline_instances.update, :parameters => { + :uuid => @pi[:uuid] + }, + :body => { :api_token => ENV['ARVADOS_API_TOKEN'], - :uuid => @pi[:uuid], :pipeline_instance => @attributes_to_update.to_json }, :authenticated => false) @@ -280,7 +305,7 @@ class JobCache if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else - debuglog "create job: #{j[:errors] rescue nil}" + debuglog "create job: #{j[:errors] rescue nil}", 0 nil end end @@ -293,16 +318,25 @@ class WhRunPipelineInstance @options = _options end - def fetch_template(template_uuid) - result = $client.execute(:api_method => $arvados.pipeline_templates.get, - :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], - :uuid => template_uuid - }, - :authenticated => false) - @template = JSON.parse result.body, :symbolize_names => true - if !@template[:uuid] - abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}" + def fetch_template(template) + if template.match /[^-0-9a-z]/ + # Doesn't look like a uuid -- use it as a filename. + @template = JSON.parse File.read(template), :symbolize_names => true + if !@template[:components] + abort ("#{$0}: Template loaded from #{template} " + + "does not have a \"components\" key") + end + else + result = $client.execute(:api_method => $arvados.pipeline_templates.get, + :parameters => { + :api_token => ENV['ARVADOS_API_TOKEN'], + :uuid => template + }, + :authenticated => false) + @template = JSON.parse result.body, :symbolize_names => true + if !@template[:uuid] + abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}" + end end self end @@ -382,10 +416,12 @@ class WhRunPipelineInstance c.delete :wait second_place_job = nil # satisfies component, but not finished yet - JobCache.where(:script => c[:script], - :script_parameters => c[:script_parameters], - :script_version_descends_from => c[:script_version_descends_from]). - each do |candidate_job| + + (@options[:no_reuse] ? [] : JobCache. + where(script: c[:script], + script_parameters: c[:script_parameters], + script_version_descends_from: c[:script_version]) + ).each do |candidate_job| candidate_params_downcase = Hash[candidate_job[:script_parameters]. map { |k,v| [k.downcase,v] }] c_params_downcase = Hash[c[:script_parameters]. @@ -397,6 +433,12 @@ class WhRunPipelineInstance next end + if c[:script_version] != + candidate_job[:script_version][0,c[:script_version].length] + debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2 + next + end + unless candidate_job[:success] || candidate_job[:running] || (!candidate_job[:started_at] && !candidate_job[:cancelled_at]) debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2 @@ -404,9 +446,11 @@ class WhRunPipelineInstance end if candidate_job[:success] - job = candidate_job - debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}" - c[:job] = job + unless @options[:no_reuse_finished] + job = candidate_job + $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}" + c[:job] = job + end else second_place_job ||= candidate_job end @@ -414,7 +458,7 @@ class WhRunPipelineInstance end if not c[:job] and second_place_job job = second_place_job - debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}" + $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}" c[:job] = job end if not c[:job] @@ -423,13 +467,13 @@ class WhRunPipelineInstance debuglog "component #{cname} new job." job = JobCache.create(:script => c[:script], :script_parameters => c[:script_parameters], - :resource_limits => c[:resource_limits] || {}, + :runtime_constraints => c[:runtime_constraints] || {}, :script_version => c[:script_version] || 'master') if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job else - debuglog "component #{cname} new job failed: #{job[:errors]}" + debuglog "component #{cname} new job failed" end end end @@ -437,7 +481,8 @@ class WhRunPipelineInstance c[:wait] = true end if c[:job] and c[:job][:uuid] - if not c[:job][:finished_at] and not c[:job][:cancelled_at] + if (c[:job][:running] or + not (c[:job][:finished_at] or c[:job][:cancelled_at])) c[:job] = JobCache.get(c[:job][:uuid]) end if c[:job][:success] @@ -462,7 +507,14 @@ class WhRunPipelineInstance @instance[:components] = @components @instance[:active] = moretodo report_status - sleep 10 if moretodo + if moretodo + begin + sleep 10 + rescue Interrupt + debuglog "interrupt", 0 + abort + end + end end @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty? @instance.save @@ -492,6 +544,7 @@ class WhRunPipelineInstance if @options[:status_text] != '/dev/null' File.open(@options[:status_text], 'w') do |f| + f.puts "" f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}" namewidth = @components.collect { |cname, c| cname.size }.max @components.each do |cname, c| @@ -526,7 +579,7 @@ begin end runner.apply_parameters(p.leftovers) runner.setup_instance - if $options[:create_only] + if $options[:create_instance_only] runner.instance.save puts runner.instance[:uuid] else