5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 if RUBY_VERSION < '1.9.3' then
64 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
68 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
69 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
70 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
71 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
72 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80 require 'google/api_client'
84 #{$0}: fatal: #{l.message}
85 Some runtime dependencies may be missing.
86 Try: gem install arvados pp google-api-client json trollop
90 def debuglog(message, verbosity=1)
91 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
96 original_verbosity = $VERBOSE
99 $VERBOSE = original_verbosity
104 if $arvados_api_host.match /local/
105 # You probably don't care about SSL certificate checks if you're
106 # testing with a dev server.
107 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 # Parse command line options (the kind that control the behavior of
112 # this program, that is, not the pipeline component parameters).
114 p = Trollop::Parser.new do
117 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
121 "Store plain text status in given file.",
124 :default => '/dev/stdout')
126 "Store json-formatted pipeline in given file.",
129 :default => '/dev/null')
131 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
135 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
139 "Print extra debugging information on stderr.",
142 "Set debug verbosity level.",
146 "UUID of pipeline template, or path to local pipeline template file.",
150 "UUID of pipeline instance.",
154 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service to satisfy the components by finding/running jobs.",
157 opt(:run_pipeline_here,
158 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
162 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
166 "Synonym for --run-jobs-here.",
171 $options = Trollop::with_standard_exception_handling p do
174 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
176 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
177 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
179 if $options[:instance]
180 if $options[:template] or $options[:submit]
181 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
183 elsif not $options[:template]
184 puts "error: you must supply a --template or --instance."
189 if $options[:run_pipeline_here] == $options[:submit]
190 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
193 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
196 def suppress_warnings
197 original_verbosity = $VERBOSE
200 $VERBOSE = original_verbosity
205 if ENV['ARVADOS_API_HOST_INSECURE']
206 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
209 # Set up the API client.
211 $arv = Arvados.new api_version: 'v1'
212 $client = $arv.client
213 $arvados = $arv.arvados_api
215 class PipelineInstance
217 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
221 :authenticated => false,
223 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
225 j = JSON.parse result.body, :symbolize_names => true
226 unless j.is_a? Hash and j[:uuid]
227 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
230 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
234 def self.create(attributes)
235 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
237 :pipeline_instance => attributes
239 :authenticated => false,
241 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
243 j = JSON.parse result.body, :symbolize_names => true
244 unless j.is_a? Hash and j[:uuid]
245 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
247 debuglog "Created pipeline instance: #{j[:uuid]}"
251 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
256 :pipeline_instance => @attributes_to_update
258 :authenticated => false,
260 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
262 j = JSON.parse result.body, :symbolize_names => true
263 unless j.is_a? Hash and j[:uuid]
264 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
267 @attributes_to_update = {}
272 @attributes_to_update[x] = y
280 $arv.log.create log: {
281 event_type: 'stderr',
282 object_uuid: self[:uuid],
283 owner_uuid: self[:owner_uuid],
284 properties: {"text" => msg},
290 @attributes_to_update = {}
298 result = $client.execute(:api_method => $arvados.jobs.get,
302 :authenticated => false,
304 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
306 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
308 def self.where(conditions)
309 result = $client.execute(:api_method => $arvados.jobs.list,
312 :where => conditions.to_json
314 :authenticated => false,
316 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
318 list = JSON.parse result.body, :symbolize_names => true
319 if list and list[:items].is_a? Array
325 def self.create(pipeline, component, job, create_params)
328 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
330 result = $client.execute(:api_method => $arvados.jobs.create,
331 :body_object => body,
332 :authenticated => false,
334 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
336 j = JSON.parse result.body, :symbolize_names => true
337 if j.is_a? Hash and j[:uuid]
340 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
343 j[:errors].each do |err|
344 msg += "Error creating job for component #{component}: #{err}\n"
346 msg += "Job submission was: #{body.to_json}"
348 pipeline.log_stderr(msg)
355 def self.no_nil_values(hash)
356 hash.reject { |key, value| value.nil? }
360 class WhRunPipelineInstance
361 attr_reader :instance
363 def initialize(_options)
367 def fetch_template(template)
368 if template.match /[^-0-9a-z]/
369 # Doesn't look like a uuid -- use it as a filename.
370 @template = JSON.parse File.read(template), :symbolize_names => true
372 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
376 :authenticated => false,
378 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
380 @template = JSON.parse result.body, :symbolize_names => true
382 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
388 def fetch_instance(instance_uuid)
389 @instance = PipelineInstance.find(instance_uuid)
390 @template = @instance
394 def apply_parameters(params_args)
395 params_args.shift if params_args[0] == '--'
397 while !params_args.empty?
398 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
399 params[re[2]] = re[3]
401 elsif params_args.size > 1
402 param = params_args.shift.sub /^--/, ''
403 params[param] = params_args.shift
405 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
409 if not @template[:components].is_a?(Hash)
410 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
412 @components = @template[:components].dup
414 bad_components = @components.each_pair.select do |cname, cspec|
415 not cspec.is_a?(Hash)
417 if bad_components.any?
418 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
421 bad_components = @components.each_pair.select do |cname, cspec|
422 not cspec[:script_parameters].is_a?(Hash)
424 if bad_components.any?
425 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
429 @components.each do |componentname, component|
430 component[:script_parameters].each do |parametername, parameter|
431 parameter = { :value => parameter } unless parameter.is_a? Hash
433 (params["#{componentname}::#{parametername}"] ||
435 (parameter[:output_of].nil? &&
436 (params[parametername.to_s] ||
437 parameter[:default])) ||
440 ![false,'false',0,'0'].index parameter[:required]
441 if parameter[:output_of]
444 errors << [componentname, parametername, "required parameter is missing"]
446 debuglog "parameter #{componentname}::#{parametername} == #{value}"
447 component[:script_parameters][parametername] = value
451 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
453 debuglog "options=" + @options.pretty_inspect
459 @instance[:properties][:run_options] ||= {}
460 if @options[:no_reuse]
461 # override properties of existing instance
462 @instance[:properties][:run_options][:enable_job_reuse] = false
464 # Default to "enable reuse" if not specified. (This code path
465 # can go away when old clients go away.)
466 if @instance[:properties][:run_options][:enable_job_reuse].nil?
467 @instance[:properties][:run_options][:enable_job_reuse] = true
471 @instance = PipelineInstance.
472 create(components: @components,
475 enable_job_reuse: !@options[:no_reuse]
478 pipeline_template_uuid: @template[:uuid],
479 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
488 if @instance[:started_at].nil?
489 @instance[:started_at] = Time.now
492 job_creation_failed = 0
495 @components.each do |cname, c|
497 owner_uuid = @instance[:owner_uuid]
498 # Is the job satisfying this component already known to be
499 # finished? (Already meaning "before we query API server about
500 # the job's current state")
501 c_already_finished = (c[:job] &&
503 !c[:job][:success].nil?)
505 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
506 # No job yet associated with this component and is component inputs
507 # are fully specified (any output_of script_parameters are resolved
509 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
510 job = JobCache.create(@instance, cname, {
511 :script => c[:script],
512 :script_parameters => c[:script_parameters],
513 :script_version => c[:script_version],
514 :repository => c[:repository],
515 :nondeterministic => c[:nondeterministic],
516 :runtime_constraints => c[:runtime_constraints],
517 :owner_uuid => owner_uuid,
518 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
519 :submit_id => my_submit_id,
521 # This is the right place to put these attributes when
522 # dealing with new API servers.
523 :minimum_script_version => c[:minimum_script_version],
524 :exclude_script_versions => c[:exclude_minimum_script_versions],
525 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
526 !c[:nondeterministic]),
527 :filters => c[:filters]
530 debuglog "component #{cname} new job #{job[:uuid]}"
532 c[:run_in_process] = (@options[:run_jobs_here] and
533 job[:submit_id] == my_submit_id)
535 debuglog "component #{cname} new job failed", 0
536 job_creation_failed += 1
540 if c[:job] and c[:run_in_process]
544 Open3.popen3("arv-crunch-job", "--force-unlock",
545 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
546 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
549 rready, wready, = IO.select([stdout, stderr], [])
552 buf = rready[0].read_nonblock(2**20)
556 (rready[0] == stdout ? $stdout : $stderr).write(buf)
560 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
562 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
563 raise Exception.new("arv-crunch-job did not set finished_at.")
565 rescue Exception => e
566 debuglog "Interrupted (#{e}). Failing job.", 0
567 $arv.job.update(uuid: c[:job][:uuid],
569 finished_at: Time.now,
576 if c[:job] and c[:job][:uuid]
577 if (c[:job][:running] or
578 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
579 # Job is running so update copy of job record
580 c[:job] = JobCache.get(c[:job][:uuid])
584 # Populate script_parameters of other components waiting for
586 @components.each do |c2name, c2|
587 c2[:script_parameters].each do |pname, p|
588 if p.is_a? Hash and p[:output_of] == cname.to_s
589 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
590 c2[:script_parameters][pname] = c[:job][:output]
595 unless c_already_finished
596 # This is my first time discovering that the job
597 # succeeded. (At the top of this loop, I was still
598 # waiting for it to finish.)
600 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
601 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
602 pipeline_name = @instance[:name]
604 fetch_template(@instance[:pipeline_template_uuid])
605 pipeline_name = @template[:name]
607 if c[:output_name] != false
608 # Create a collection located in the same project as the pipeline with the contents of the output.
609 portable_data_hash = c[:job][:output]
610 collections = $arv.collection.list(limit: 1,
611 filters: [['portable_data_hash', '=', portable_data_hash]],
612 select: ["portable_data_hash", "manifest_text"]
615 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
617 # check if there is a name collision.
618 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
619 ["name", "=", name]])[:items]
621 newcollection_actual = nil
622 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
623 # There is already a collection with the same name and the
624 # same contents, so just point to that.
625 newcollection_actual = name_collisions.first
628 if newcollection_actual.nil?
629 # Did not find a collection with the same name (or the
630 # collection has a different portable data hash) so create
631 # a new collection with ensure_unique_name: true.
633 owner_uuid: owner_uuid,
635 portable_data_hash: collections.first[:portable_data_hash],
636 manifest_text: collections.first[:manifest_text]
638 debuglog "Creating collection #{newcollection}", 0
639 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
642 c[:output_uuid] = newcollection_actual[:uuid]
644 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
648 elsif c[:job][:running] ||
649 (!c[:job][:started_at] && !c[:job][:cancelled_at])
650 # Job is still running
652 elsif c[:job][:cancelled_at]
653 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
657 @instance[:components] = @components
660 if @options[:no_wait]
664 # If job creation fails, just give up on this pipeline instance.
665 if job_creation_failed > 0
673 debuglog "interrupt", 0
683 @components.each do |cname, c|
685 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
687 if c[:job][:success] == true
689 elsif c[:job][:success] == false or c[:job][:cancelled_at]
696 success = (succeeded == @components.length)
698 # A job create call failed. Just give up.
699 if job_creation_failed > 0
700 debuglog "job creation failed - giving up on this pipeline instance", 0
707 @instance[:state] = 'Complete'
709 @instance[:state] = 'Paused'
712 if ended == @components.length or failed > 0
713 @instance[:state] = success ? 'Complete' : 'Failed'
717 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
718 @instance[:finished_at] = Time.now
721 debuglog "pipeline instance state is #{@instance[:state]}"
723 # set components_summary
724 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
725 @instance[:components_summary] = components_summary
731 if @instance and @instance[:state] == 'RunningOnClient'
732 @instance[:state] = 'Paused'
746 if @options[:status_json] != '/dev/null'
747 File.open(@options[:status_json], 'w') do |f|
748 f.puts @components.pretty_inspect
752 if @options[:status_text] != '/dev/null'
753 File.open(@options[:status_text], 'w') do |f|
755 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
756 namewidth = @components.collect { |cname, c| cname.size }.max
757 @components.each do |cname, c|
758 jstatus = if !c[:job]
760 elsif c[:job][:running]
761 "#{c[:job][:tasks_summary].inspect}"
762 elsif c[:job][:success]
764 elsif c[:job][:cancelled_at]
765 "cancelled #{c[:job][:cancelled_at]}"
766 elsif c[:job][:finished_at]
767 "failed #{c[:job][:finished_at]}"
768 elsif c[:job][:started_at]
769 "started #{c[:job][:started_at]}"
770 elsif c[:job][:is_locked_by_uuid]
771 "starting #{c[:job][:started_at]}"
773 "queued #{c[:job][:created_at]}"
775 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
783 if ["New", "Ready", "RunningOnClient",
784 "RunningOnServer"].include?(@instance[:state])
785 @instance[:state] = "Failed"
786 @instance[:finished_at] = Time.now
789 @instance.log_stderr(msg)
795 runner = WhRunPipelineInstance.new($options)
797 if $options[:template]
798 runner.fetch_template($options[:template])
800 runner.fetch_instance($options[:instance])
802 runner.apply_parameters(p.leftovers)
803 runner.setup_instance
806 puts runner.instance[:uuid]
810 rescue Exception => e