5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 if RUBY_VERSION < '1.9.3' then
64 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
68 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
69 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
70 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
71 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
72 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80 require 'google/api_client'
84 #{$0}: fatal: #{l.message}
85 Some runtime dependencies may be missing.
86 Try: gem install arvados pp google-api-client json trollop
90 def debuglog(message, verbosity=1)
91 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
96 original_verbosity = $VERBOSE
99 $VERBOSE = original_verbosity
104 if $arvados_api_host.match /local/
105 # You probably don't care about SSL certificate checks if you're
106 # testing with a dev server.
107 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 # Parse command line options (the kind that control the behavior of
112 # this program, that is, not the pipeline component parameters).
114 p = Trollop::Parser.new do
117 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
121 "Store plain text status in given file.",
124 :default => '/dev/stdout')
126 "Store json-formatted pipeline in given file.",
129 :default => '/dev/null')
131 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
135 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
139 "Print extra debugging information on stderr.",
142 "Set debug verbosity level.",
146 "UUID of pipeline template, or path to local pipeline template file.",
150 "UUID of pipeline instance.",
154 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
158 "Manage the pipeline in process.",
163 $options = Trollop::with_standard_exception_handling p do
166 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
168 if $options[:instance]
169 if $options[:template] or $options[:submit]
170 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
172 elsif not $options[:template]
173 puts "error: you must supply a --template or --instance."
178 if $options[:run_here] == $options[:submit]
179 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
182 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
185 def suppress_warnings
186 original_verbosity = $VERBOSE
189 $VERBOSE = original_verbosity
194 if ENV['ARVADOS_API_HOST_INSECURE']
195 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
198 # Set up the API client.
200 $arv = Arvados.new api_version: 'v1'
201 $client = $arv.client
202 $arvados = $arv.arvados_api
204 class PipelineInstance
206 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
210 :authenticated => false,
212 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
214 j = JSON.parse result.body, :symbolize_names => true
215 unless j.is_a? Hash and j[:uuid]
216 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
219 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
223 def self.create(attributes)
224 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
226 :pipeline_instance => attributes
228 :authenticated => false,
230 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
232 j = JSON.parse result.body, :symbolize_names => true
233 unless j.is_a? Hash and j[:uuid]
234 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
236 debuglog "Created pipeline instance: #{j[:uuid]}"
240 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
245 :pipeline_instance => @attributes_to_update
247 :authenticated => false,
249 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
251 j = JSON.parse result.body, :symbolize_names => true
252 unless j.is_a? Hash and j[:uuid]
253 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
256 @attributes_to_update = {}
261 @attributes_to_update[x] = y
269 $arv.log.create log: {
270 event_type: 'stderr',
271 object_uuid: self[:uuid],
272 owner_uuid: self[:owner_uuid],
273 properties: {"text" => msg},
279 @attributes_to_update = {}
287 result = $client.execute(:api_method => $arvados.jobs.get,
291 :authenticated => false,
293 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
295 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
297 def self.where(conditions)
298 result = $client.execute(:api_method => $arvados.jobs.list,
301 :where => conditions.to_json
303 :authenticated => false,
305 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
307 list = JSON.parse result.body, :symbolize_names => true
308 if list and list[:items].is_a? Array
314 def self.create(pipeline, component, job, create_params)
317 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
319 result = $client.execute(:api_method => $arvados.jobs.create,
320 :body_object => body,
321 :authenticated => false,
323 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
325 j = JSON.parse result.body, :symbolize_names => true
326 if j.is_a? Hash and j[:uuid]
329 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
332 j[:errors].each do |err|
333 msg += "Error creating job for component #{component}: #{err}\n"
335 msg += "Job submission was: #{body.to_json}"
337 pipeline.log_stderr(msg)
344 def self.no_nil_values(hash)
345 hash.reject { |key, value| value.nil? }
349 class WhRunPipelineInstance
350 attr_reader :instance
352 def initialize(_options)
356 def fetch_template(template)
357 if template.match /[^-0-9a-z]/
358 # Doesn't look like a uuid -- use it as a filename.
359 @template = JSON.parse File.read(template), :symbolize_names => true
361 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
365 :authenticated => false,
367 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
369 @template = JSON.parse result.body, :symbolize_names => true
371 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
377 def fetch_instance(instance_uuid)
378 @instance = PipelineInstance.find(instance_uuid)
379 @template = @instance
383 def apply_parameters(params_args)
384 params_args.shift if params_args[0] == '--'
386 while !params_args.empty?
387 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
388 params[re[2]] = re[3]
390 elsif params_args.size > 1
391 param = params_args.shift.sub /^--/, ''
392 params[param] = params_args.shift
394 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
398 if not @template[:components].is_a?(Hash)
399 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
401 @components = @template[:components].dup
403 bad_components = @components.each_pair.select do |cname, cspec|
404 not cspec.is_a?(Hash)
406 if bad_components.any?
407 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
410 bad_components = @components.each_pair.select do |cname, cspec|
411 not cspec[:script_parameters].is_a?(Hash)
413 if bad_components.any?
414 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
418 @components.each do |componentname, component|
419 component[:script_parameters].each do |parametername, parameter|
420 parameter = { :value => parameter } unless parameter.is_a? Hash
422 (params["#{componentname}::#{parametername}"] ||
424 (parameter[:output_of].nil? &&
425 (params[parametername.to_s] ||
426 parameter[:default])) ||
429 ![false,'false',0,'0'].index parameter[:required]
430 if parameter[:output_of]
433 errors << [componentname, parametername, "required parameter is missing"]
435 debuglog "parameter #{componentname}::#{parametername} == #{value}"
436 component[:script_parameters][parametername] = value
440 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
442 debuglog "options=" + @options.pretty_inspect
448 @instance[:properties][:run_options] ||= {}
449 if @options[:no_reuse]
450 # override properties of existing instance
451 @instance[:properties][:run_options][:enable_job_reuse] = false
453 # Default to "enable reuse" if not specified. (This code path
454 # can go away when old clients go away.)
455 if @instance[:properties][:run_options][:enable_job_reuse].nil?
456 @instance[:properties][:run_options][:enable_job_reuse] = true
460 @instance = PipelineInstance.
461 create(components: @components,
464 enable_job_reuse: !@options[:no_reuse]
467 pipeline_template_uuid: @template[:uuid],
468 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
477 job_creation_failed = 0
480 @components.each do |cname, c|
482 owner_uuid = @instance[:owner_uuid]
483 # Is the job satisfying this component already known to be
484 # finished? (Already meaning "before we query API server about
485 # the job's current state")
486 c_already_finished = (c[:job] &&
488 !c[:job][:success].nil?)
490 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
491 # No job yet associated with this component and is component inputs
492 # are fully specified (any output_of script_parameters are resolved
494 job = JobCache.create(@instance, cname, {
495 :script => c[:script],
496 :script_parameters => c[:script_parameters],
497 :script_version => c[:script_version],
498 :repository => c[:repository],
499 :nondeterministic => c[:nondeterministic],
500 :runtime_constraints => c[:runtime_constraints],
501 :owner_uuid => owner_uuid,
503 # This is the right place to put these attributes when
504 # dealing with new API servers.
505 :minimum_script_version => c[:minimum_script_version],
506 :exclude_script_versions => c[:exclude_minimum_script_versions],
507 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
508 !c[:nondeterministic]),
509 :filters => c[:filters]
512 debuglog "component #{cname} new job #{job[:uuid]}"
515 debuglog "component #{cname} new job failed", 0
516 job_creation_failed += 1
520 if c[:job] and c[:job][:uuid]
521 if (c[:job][:running] or
522 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
523 # Job is running so update copy of job record
524 c[:job] = JobCache.get(c[:job][:uuid])
528 # Populate script_parameters of other components waiting for
530 @components.each do |c2name, c2|
531 c2[:script_parameters].each do |pname, p|
532 if p.is_a? Hash and p[:output_of] == cname.to_s
533 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
534 c2[:script_parameters][pname] = c[:job][:output]
539 unless c_already_finished
540 # This is my first time discovering that the job
541 # succeeded. (At the top of this loop, I was still
542 # waiting for it to finish.)
544 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
545 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
546 pipeline_name = @instance[:name]
548 fetch_template(@instance[:pipeline_template_uuid])
549 pipeline_name = @template[:name]
551 if c[:output_name] != false
552 output_name = c[:output_name] || "Output of #{cname} of #{pipeline_name}"
553 # Create a collection located in the same project as the pipeline with the contents of the output.
554 portable_data_hash = c[:job][:output]
555 collections = $arv.collection.list(limit: 1,
556 filters: [['portable_data_hash', '=', portable_data_hash]],
557 select: ["portable_data_hash", "manifest_text"]
561 owner_uuid: owner_uuid,
562 name: "#{output_name} at #{c[:job][:finished_at]}",
563 portable_data_hash: collections.first[:portable_data_hash],
564 manifest_text: collections.first[:manifest_text]
566 debuglog "Creating collection #{newcollection}", 0
567 newcollection_actual = $arv.collection.create collection: newcollection
568 c[:output_uuid] = newcollection_actual[:uuid]
570 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
574 elsif c[:job][:running] ||
575 (!c[:job][:started_at] && !c[:job][:cancelled_at])
576 # Job is still running
578 elsif c[:job][:cancelled_at]
579 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
583 @instance[:components] = @components
586 if @options[:no_wait]
590 # If job creation fails, just give up on this pipeline instance.
591 if job_creation_failed > 0
599 debuglog "interrupt", 0
609 @components.each do |cname, c|
611 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
613 if c[:job][:success] == true
615 elsif c[:job][:success] == false or c[:job][:cancelled_at]
622 success = (succeeded == @components.length)
624 # A job create call failed. Just give up.
625 if job_creation_failed > 0
626 debuglog "job creation failed - giving up on this pipeline instance", 0
633 @instance[:state] = 'Complete'
635 @instance[:state] = 'Paused'
638 if ended == @components.length or failed > 0
639 @instance[:state] = success ? 'Complete' : 'Failed'
643 debuglog "pipeline instance state is #{@instance[:state]}"
645 # set components_summary
646 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
647 @instance[:components_summary] = components_summary
653 if @instance and @instance[:state] == 'RunningOnClient'
654 @instance[:state] = 'Paused'
668 if @options[:status_json] != '/dev/null'
669 File.open(@options[:status_json], 'w') do |f|
670 f.puts @components.pretty_inspect
674 if @options[:status_text] != '/dev/null'
675 File.open(@options[:status_text], 'w') do |f|
677 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
678 namewidth = @components.collect { |cname, c| cname.size }.max
679 @components.each do |cname, c|
680 jstatus = if !c[:job]
682 elsif c[:job][:running]
683 "#{c[:job][:tasks_summary].inspect}"
684 elsif c[:job][:success]
686 elsif c[:job][:cancelled_at]
687 "cancelled #{c[:job][:cancelled_at]}"
688 elsif c[:job][:finished_at]
689 "failed #{c[:job][:finished_at]}"
690 elsif c[:job][:started_at]
691 "started #{c[:job][:started_at]}"
693 "queued #{c[:job][:created_at]}"
695 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
703 if ["New", "Ready", "RunningOnClient",
704 "RunningOnServer"].include?(@instance[:state])
705 @instance[:state] = "Failed"
708 @instance.log_stderr(msg)
714 runner = WhRunPipelineInstance.new($options)
716 if $options[:template]
717 runner.fetch_template($options[:template])
719 runner.fetch_instance($options[:instance])
721 runner.apply_parameters(p.leftovers)
722 runner.setup_instance
725 puts runner.instance[:uuid]
729 rescue Exception => e