5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 if RUBY_VERSION < '1.9.3' then
64 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
68 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
69 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
70 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
71 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
72 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80 require 'google/api_client'
84 #{$0}: fatal: #{l.message}
85 Some runtime dependencies may be missing.
86 Try: gem install arvados pp google-api-client json trollop
90 def debuglog(message, verbosity=1)
91 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
96 original_verbosity = $VERBOSE
99 $VERBOSE = original_verbosity
104 if $arvados_api_host.match /local/
105 # You probably don't care about SSL certificate checks if you're
106 # testing with a dev server.
107 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 # Parse command line options (the kind that control the behavior of
112 # this program, that is, not the pipeline component parameters).
114 p = Trollop::Parser.new do
117 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
121 "Store plain text status in given file.",
124 :default => '/dev/stdout')
126 "Store json-formatted pipeline in given file.",
129 :default => '/dev/null')
131 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
135 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
139 "Print extra debugging information on stderr.",
142 "Set debug verbosity level.",
146 "UUID of pipeline template, or path to local pipeline template file.",
150 "UUID of pipeline instance.",
154 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service to satisfy the components by finding/running jobs.",
157 opt(:run_pipeline_here,
158 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
162 "Manage the pipeline instance in-process. Find/run/watch jobs until the pipeline finishes (or fails). Implies --run-pipeline-here.",
166 "Synonym for --run-jobs-here.",
171 $options = Trollop::with_standard_exception_handling p do
174 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
176 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
177 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
179 if $options[:instance]
180 if $options[:template] or $options[:submit]
181 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
183 elsif not $options[:template]
184 puts "error: you must supply a --template or --instance."
189 if $options[:run_pipeline_here] == $options[:submit]
190 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
193 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
196 def suppress_warnings
197 original_verbosity = $VERBOSE
200 $VERBOSE = original_verbosity
205 if ENV['ARVADOS_API_HOST_INSECURE']
206 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
209 # Set up the API client.
211 $arv = Arvados.new api_version: 'v1'
212 $client = $arv.client
213 $arvados = $arv.arvados_api
215 class PipelineInstance
217 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
221 :authenticated => false,
223 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
225 j = JSON.parse result.body, :symbolize_names => true
226 unless j.is_a? Hash and j[:uuid]
227 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
230 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
234 def self.create(attributes)
235 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
237 :pipeline_instance => attributes
239 :authenticated => false,
241 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
243 j = JSON.parse result.body, :symbolize_names => true
244 unless j.is_a? Hash and j[:uuid]
245 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
247 debuglog "Created pipeline instance: #{j[:uuid]}"
251 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
256 :pipeline_instance => @attributes_to_update
258 :authenticated => false,
260 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
262 j = JSON.parse result.body, :symbolize_names => true
263 unless j.is_a? Hash and j[:uuid]
264 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
267 @attributes_to_update = {}
272 @attributes_to_update[x] = y
280 $arv.log.create log: {
281 event_type: 'stderr',
282 object_uuid: self[:uuid],
283 owner_uuid: self[:owner_uuid],
284 properties: {"text" => msg},
290 @attributes_to_update = {}
298 result = $client.execute(:api_method => $arvados.jobs.get,
302 :authenticated => false,
304 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
306 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
308 def self.where(conditions)
309 result = $client.execute(:api_method => $arvados.jobs.list,
312 :where => conditions.to_json
314 :authenticated => false,
316 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
318 list = JSON.parse result.body, :symbolize_names => true
319 if list and list[:items].is_a? Array
325 def self.create(pipeline, component, job, create_params)
328 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
330 result = $client.execute(:api_method => $arvados.jobs.create,
331 :body_object => body,
332 :authenticated => false,
334 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
336 j = JSON.parse result.body, :symbolize_names => true
337 if j.is_a? Hash and j[:uuid]
340 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
343 j[:errors].each do |err|
344 msg += "Error creating job for component #{component}: #{err}\n"
346 msg += "Job submission was: #{body.to_json}"
348 pipeline.log_stderr(msg)
355 def self.no_nil_values(hash)
356 hash.reject { |key, value| value.nil? }
360 class WhRunPipelineInstance
361 attr_reader :instance
363 def initialize(_options)
367 def fetch_template(template)
368 if template.match /[^-0-9a-z]/
369 # Doesn't look like a uuid -- use it as a filename.
370 @template = JSON.parse File.read(template), :symbolize_names => true
372 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
376 :authenticated => false,
378 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
380 @template = JSON.parse result.body, :symbolize_names => true
382 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
388 def fetch_instance(instance_uuid)
389 @instance = PipelineInstance.find(instance_uuid)
390 @template = @instance
394 def apply_parameters(params_args)
395 params_args.shift if params_args[0] == '--'
397 while !params_args.empty?
398 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
399 params[re[2]] = re[3]
401 elsif params_args.size > 1
402 param = params_args.shift.sub /^--/, ''
403 params[param] = params_args.shift
405 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
409 if not @template[:components].is_a?(Hash)
410 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
412 @components = @template[:components].dup
414 bad_components = @components.each_pair.select do |cname, cspec|
415 not cspec.is_a?(Hash)
417 if bad_components.any?
418 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
421 bad_components = @components.each_pair.select do |cname, cspec|
422 not cspec[:script_parameters].is_a?(Hash)
424 if bad_components.any?
425 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
429 @components.each do |componentname, component|
430 component[:script_parameters].each do |parametername, parameter|
431 parameter = { :value => parameter } unless parameter.is_a? Hash
433 (params["#{componentname}::#{parametername}"] ||
435 (parameter[:output_of].nil? &&
436 (params[parametername.to_s] ||
437 parameter[:default])) ||
440 ![false,'false',0,'0'].index parameter[:required]
441 if parameter[:output_of]
444 errors << [componentname, parametername, "required parameter is missing"]
446 debuglog "parameter #{componentname}::#{parametername} == #{value}"
447 component[:script_parameters][parametername] = value
451 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
453 debuglog "options=" + @options.pretty_inspect
459 @instance[:properties][:run_options] ||= {}
460 if @options[:no_reuse]
461 # override properties of existing instance
462 @instance[:properties][:run_options][:enable_job_reuse] = false
464 # Default to "enable reuse" if not specified. (This code path
465 # can go away when old clients go away.)
466 if @instance[:properties][:run_options][:enable_job_reuse].nil?
467 @instance[:properties][:run_options][:enable_job_reuse] = true
471 @instance = PipelineInstance.
472 create(components: @components,
475 enable_job_reuse: !@options[:no_reuse]
478 pipeline_template_uuid: @template[:uuid],
479 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
488 job_creation_failed = 0
491 @components.each do |cname, c|
493 owner_uuid = @instance[:owner_uuid]
494 # Is the job satisfying this component already known to be
495 # finished? (Already meaning "before we query API server about
496 # the job's current state")
497 c_already_finished = (c[:job] &&
499 !c[:job][:success].nil?)
501 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
502 # No job yet associated with this component and is component inputs
503 # are fully specified (any output_of script_parameters are resolved
505 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
506 job = JobCache.create(@instance, cname, {
507 :script => c[:script],
508 :script_parameters => c[:script_parameters],
509 :script_version => c[:script_version],
510 :repository => c[:repository],
511 :nondeterministic => c[:nondeterministic],
512 :runtime_constraints => c[:runtime_constraints],
513 :owner_uuid => owner_uuid,
514 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
515 :submit_id => my_submit_id,
517 # This is the right place to put these attributes when
518 # dealing with new API servers.
519 :minimum_script_version => c[:minimum_script_version],
520 :exclude_script_versions => c[:exclude_minimum_script_versions],
521 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
522 !c[:nondeterministic]),
523 :filters => c[:filters]
526 debuglog "component #{cname} new job #{job[:uuid]}"
528 c[:run_in_process] = (@options[:run_jobs_here] and
529 job[:submit_id] == my_submit_id)
531 debuglog "component #{cname} new job failed", 0
532 job_creation_failed += 1
536 if c[:job] and c[:run_in_process]
540 Open3.popen3("arv-crunch-job", "--force-unlock",
541 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
542 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
545 rready, wready, = IO.select([stdout, stderr], [])
548 buf = rready[0].read_nonblock(2**20)
552 (rready[0] == stdout ? $stdout : $stderr).write(buf)
556 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
558 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
559 raise Exception.new("arv-crunch-job did not set finished_at.")
561 rescue Exception => e
562 debuglog "Interrupted (#{e}). Failing job.", 0
563 $arv.job.update(uuid: c[:job][:uuid],
565 finished_at: Time.now,
572 if c[:job] and c[:job][:uuid]
573 if (c[:job][:running] or
574 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
575 # Job is running so update copy of job record
576 c[:job] = JobCache.get(c[:job][:uuid])
580 # Populate script_parameters of other components waiting for
582 @components.each do |c2name, c2|
583 c2[:script_parameters].each do |pname, p|
584 if p.is_a? Hash and p[:output_of] == cname.to_s
585 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
586 c2[:script_parameters][pname] = c[:job][:output]
591 unless c_already_finished
592 # This is my first time discovering that the job
593 # succeeded. (At the top of this loop, I was still
594 # waiting for it to finish.)
596 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
597 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
598 pipeline_name = @instance[:name]
600 fetch_template(@instance[:pipeline_template_uuid])
601 pipeline_name = @template[:name]
603 if c[:output_name] != false
604 output_name = c[:output_name] || "Output of #{cname} of #{pipeline_name}"
605 # Create a collection located in the same project as the pipeline with the contents of the output.
606 portable_data_hash = c[:job][:output]
607 collections = $arv.collection.list(limit: 1,
608 filters: [['portable_data_hash', '=', portable_data_hash]],
609 select: ["portable_data_hash", "manifest_text"]
613 owner_uuid: owner_uuid,
614 name: "#{output_name} at #{c[:job][:finished_at]}",
615 portable_data_hash: collections.first[:portable_data_hash],
616 manifest_text: collections.first[:manifest_text]
618 debuglog "Creating collection #{newcollection}", 0
619 newcollection_actual = $arv.collection.create collection: newcollection
620 c[:output_uuid] = newcollection_actual[:uuid]
622 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
626 elsif c[:job][:running] ||
627 (!c[:job][:started_at] && !c[:job][:cancelled_at])
628 # Job is still running
630 elsif c[:job][:cancelled_at]
631 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
635 @instance[:components] = @components
638 if @options[:no_wait]
642 # If job creation fails, just give up on this pipeline instance.
643 if job_creation_failed > 0
651 debuglog "interrupt", 0
661 @components.each do |cname, c|
663 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
665 if c[:job][:success] == true
667 elsif c[:job][:success] == false or c[:job][:cancelled_at]
674 success = (succeeded == @components.length)
676 # A job create call failed. Just give up.
677 if job_creation_failed > 0
678 debuglog "job creation failed - giving up on this pipeline instance", 0
685 @instance[:state] = 'Complete'
687 @instance[:state] = 'Paused'
690 if ended == @components.length or failed > 0
691 @instance[:state] = success ? 'Complete' : 'Failed'
695 debuglog "pipeline instance state is #{@instance[:state]}"
697 # set components_summary
698 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
699 @instance[:components_summary] = components_summary
705 if @instance and @instance[:state] == 'RunningOnClient'
706 @instance[:state] = 'Paused'
720 if @options[:status_json] != '/dev/null'
721 File.open(@options[:status_json], 'w') do |f|
722 f.puts @components.pretty_inspect
726 if @options[:status_text] != '/dev/null'
727 File.open(@options[:status_text], 'w') do |f|
729 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
730 namewidth = @components.collect { |cname, c| cname.size }.max
731 @components.each do |cname, c|
732 jstatus = if !c[:job]
734 elsif c[:job][:running]
735 "#{c[:job][:tasks_summary].inspect}"
736 elsif c[:job][:success]
738 elsif c[:job][:cancelled_at]
739 "cancelled #{c[:job][:cancelled_at]}"
740 elsif c[:job][:finished_at]
741 "failed #{c[:job][:finished_at]}"
742 elsif c[:job][:started_at]
743 "started #{c[:job][:started_at]}"
744 elsif c[:job][:is_locked_by_uuid]
745 "starting #{c[:job][:started_at]}"
747 "queued #{c[:job][:created_at]}"
749 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
757 if ["New", "Ready", "RunningOnClient",
758 "RunningOnServer"].include?(@instance[:state])
759 @instance[:state] = "Failed"
762 @instance.log_stderr(msg)
768 runner = WhRunPipelineInstance.new($options)
770 if $options[:template]
771 runner.fetch_template($options[:template])
773 runner.fetch_instance($options[:instance])
775 runner.apply_parameters(p.leftovers)
776 runner.setup_instance
779 puts runner.instance[:uuid]
783 rescue Exception => e