5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 $arv.log.create log: {
291 event_type: 'stderr',
292 object_uuid: self[:uuid],
293 owner_uuid: self[:owner_uuid],
294 properties: {"text" => msg},
300 @attributes_to_update = {}
308 result = $client.execute(:api_method => $arvados.jobs.get,
312 :authenticated => false,
314 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
316 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
318 def self.where(conditions)
319 result = $client.execute(:api_method => $arvados.jobs.list,
322 :where => conditions.to_json
324 :authenticated => false,
326 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
328 list = JSON.parse result.body, :symbolize_names => true
329 if list and list[:items].is_a? Array
335 def self.create(pipeline, component, job, create_params)
338 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
340 result = $client.execute(:api_method => $arvados.jobs.create,
341 :body_object => body,
342 :authenticated => false,
344 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
346 j = JSON.parse result.body, :symbolize_names => true
347 if j.is_a? Hash and j[:uuid]
350 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
353 j[:errors].each do |err|
354 msg += "Error creating job for component #{component}: #{err}\n"
356 msg += "Job submission was: #{body.to_json}"
358 pipeline.log_stderr(msg)
365 def self.no_nil_values(hash)
366 hash.reject { |key, value| value.nil? }
370 class WhRunPipelineInstance
371 attr_reader :instance
373 def initialize(_options)
377 def fetch_template(template)
378 if template.match /[^-0-9a-z]/
379 # Doesn't look like a uuid -- use it as a filename.
380 @template = JSON.parse File.read(template), :symbolize_names => true
382 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
386 :authenticated => false,
388 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
390 @template = JSON.parse result.body, :symbolize_names => true
392 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
398 def fetch_instance(instance_uuid)
399 @instance = PipelineInstance.find(instance_uuid)
400 @template = @instance
404 def apply_parameters(params_args)
405 params_args.shift if params_args[0] == '--'
407 while !params_args.empty?
408 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
409 params[re[2]] = re[3]
411 elsif params_args.size > 1
412 param = params_args.shift.sub /^--/, ''
413 params[param] = params_args.shift
415 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
419 if not @template[:components].is_a?(Hash)
420 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
422 @components = @template[:components].dup
424 bad_components = @components.each_pair.select do |cname, cspec|
425 not cspec.is_a?(Hash)
427 if bad_components.any?
428 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
431 bad_components = @components.each_pair.select do |cname, cspec|
432 not cspec[:script_parameters].is_a?(Hash)
434 if bad_components.any?
435 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
439 @components.each do |componentname, component|
440 component[:script_parameters].each do |parametername, parameter|
441 parameter = { :value => parameter } unless parameter.is_a? Hash
443 (params["#{componentname}::#{parametername}"] ||
445 (parameter[:output_of].nil? &&
446 (params[parametername.to_s] ||
447 parameter[:default])) ||
450 ![false,'false',0,'0'].index parameter[:required]
451 if parameter[:output_of]
454 errors << [componentname, parametername, "required parameter is missing"]
456 debuglog "parameter #{componentname}::#{parametername} == #{value}"
457 component[:script_parameters][parametername] = value
461 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
463 debuglog "options=" + @options.pretty_inspect
469 @instance[:properties][:run_options] ||= {}
470 if @options[:no_reuse]
471 # override properties of existing instance
472 @instance[:properties][:run_options][:enable_job_reuse] = false
474 # Default to "enable reuse" if not specified. (This code path
475 # can go away when old clients go away.)
476 if @instance[:properties][:run_options][:enable_job_reuse].nil?
477 @instance[:properties][:run_options][:enable_job_reuse] = true
481 @instance = PipelineInstance.
482 create(components: @components,
485 enable_job_reuse: !@options[:no_reuse]
488 pipeline_template_uuid: @template[:uuid],
489 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
498 job_creation_failed = 0
501 @components.each do |cname, c|
503 owner_uuid = @instance[:owner_uuid]
504 # Is the job satisfying this component already known to be
505 # finished? (Already meaning "before we query API server about
506 # the job's current state")
507 c_already_finished = (c[:job] &&
509 !c[:job][:success].nil?)
511 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
512 # No job yet associated with this component and is component inputs
513 # are fully specified (any output_of script_parameters are resolved
515 job = JobCache.create(@instance, cname, {
516 :script => c[:script],
517 :script_parameters => c[:script_parameters],
518 :script_version => c[:script_version],
519 :repository => c[:repository],
520 :nondeterministic => c[:nondeterministic],
521 :runtime_constraints => c[:runtime_constraints],
522 :owner_uuid => owner_uuid,
524 # This is the right place to put these attributes when
525 # dealing with new API servers.
526 :minimum_script_version => c[:minimum_script_version],
527 :exclude_script_versions => c[:exclude_minimum_script_versions],
528 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
529 !c[:nondeterministic]),
530 :filters => c[:filters]
533 debuglog "component #{cname} new job #{job[:uuid]}"
536 debuglog "component #{cname} new job failed", 0
537 job_creation_failed += 1
541 if c[:job] and c[:job][:uuid]
542 if (c[:job][:running] or
543 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
544 # Job is running so update copy of job record
545 c[:job] = JobCache.get(c[:job][:uuid])
549 # Populate script_parameters of other components waiting for
551 @components.each do |c2name, c2|
552 c2[:script_parameters].each do |pname, p|
553 if p.is_a? Hash and p[:output_of] == cname.to_s
554 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
555 c2[:script_parameters][pname] = c[:job][:output]
560 unless c_already_finished
561 # This is my first time discovering that the job
562 # succeeded. (At the top of this loop, I was still
563 # waiting for it to finish.)
565 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
566 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
567 pipeline_name = @instance[:name]
569 fetch_template(@instance[:pipeline_template_uuid])
570 pipeline_name = @template[:name]
572 output_name = c[:output_name] || "Output of #{cname} of #{pipeline_name}"
573 # Create a collection located in the same project as the pipeline with the contents of the output.
574 portable_data_hash = c[:job][:output]
575 collections = $arv.collection.list(limit: 1,
576 filters: [['portable_data_hash', '=', portable_data_hash]],
577 select: ["portable_data_hash", "manifest_text"]
581 owner_uuid: owner_uuid,
582 name: "#{output_name} at #{c[:job][:finished_at]}",
583 portable_data_hash: collections.first[:portable_data_hash],
584 manifest_text: collections.first[:manifest_text]
586 debuglog "Creating collection #{newcollection}", 0
587 newcollection_actual = $arv.collection.create collection: newcollection
588 c[:output_uuid] = newcollection_actual[:uuid]
590 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
593 elsif c[:job][:running] ||
594 (!c[:job][:started_at] && !c[:job][:cancelled_at])
595 # Job is still running
597 elsif c[:job][:cancelled_at]
598 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
602 @instance[:components] = @components
605 if @options[:no_wait]
609 # If job creation fails, just give up on this pipeline instance.
610 if job_creation_failed > 0
618 debuglog "interrupt", 0
628 @components.each do |cname, c|
630 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
632 if c[:job][:success] == true
634 elsif c[:job][:success] == false or c[:job][:cancelled_at]
641 success = (succeeded == @components.length)
643 # A job create call failed. Just give up.
644 if job_creation_failed > 0
645 debuglog "job creation failed - giving up on this pipeline instance", 0
652 @instance[:state] = 'Complete'
654 @instance[:state] = 'Paused'
657 if ended == @components.length or failed > 0
658 @instance[:state] = success ? 'Complete' : 'Failed'
662 debuglog "pipeline instance state is #{@instance[:state]}"
664 # set components_summary
665 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
666 @instance[:components_summary] = components_summary
672 if @instance and @instance[:state] == 'RunningOnClient'
673 @instance[:state] = 'Paused'
687 if @options[:status_json] != '/dev/null'
688 File.open(@options[:status_json], 'w') do |f|
689 f.puts @components.pretty_inspect
693 if @options[:status_text] != '/dev/null'
694 File.open(@options[:status_text], 'w') do |f|
696 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
697 namewidth = @components.collect { |cname, c| cname.size }.max
698 @components.each do |cname, c|
699 jstatus = if !c[:job]
701 elsif c[:job][:running]
702 "#{c[:job][:tasks_summary].inspect}"
703 elsif c[:job][:success]
705 elsif c[:job][:cancelled_at]
706 "cancelled #{c[:job][:cancelled_at]}"
707 elsif c[:job][:finished_at]
708 "failed #{c[:job][:finished_at]}"
709 elsif c[:job][:started_at]
710 "started #{c[:job][:started_at]}"
712 "queued #{c[:job][:created_at]}"
714 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
722 if ["New", "Ready", "RunningOnClient",
723 "RunningOnServer"].include?(@instance[:state])
724 @instance[:state] = "Failed"
727 @instance.log_stderr(msg)
733 runner = WhRunPipelineInstance.new($options)
735 if $options[:template]
736 runner.fetch_template($options[:template])
738 runner.fetch_instance($options[:instance])
740 runner.apply_parameters(p.leftovers)
741 runner.setup_instance
744 puts runner.instance[:uuid]
748 rescue Exception => e