5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update.to_json
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 @attributes_to_update = {}
298 result = $client.execute(:api_method => $arvados.jobs.get,
302 :authenticated => false,
304 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
306 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
308 def self.where(conditions)
309 result = $client.execute(:api_method => $arvados.jobs.list,
312 :where => conditions.to_json
314 :authenticated => false,
316 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
318 list = JSON.parse result.body, :symbolize_names => true
319 if list and list[:items].is_a? Array
325 def self.create(job, create_params)
328 jsonified_create_params = {}
329 create_params.each do |k, v|
330 jsonified_create_params[k] = v.to_json unless v.nil?
333 result = $client.execute(:api_method => $arvados.jobs.create,
336 }.merge(jsonified_create_params),
337 :authenticated => false,
339 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
341 j = JSON.parse result.body, :symbolize_names => true
342 if j.is_a? Hash and j[:uuid]
345 debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
351 class WhRunPipelineInstance
352 attr_reader :instance
354 def initialize(_options)
358 def fetch_template(template)
359 if template.match /[^-0-9a-z]/
360 # Doesn't look like a uuid -- use it as a filename.
361 @template = JSON.parse File.read(template), :symbolize_names => true
362 if !@template[:components]
363 abort ("#{$0}: Template loaded from #{template} " +
364 "does not have a \"components\" key")
367 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
371 :authenticated => false,
373 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
375 @template = JSON.parse result.body, :symbolize_names => true
377 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
383 def fetch_instance(instance_uuid)
384 @instance = PipelineInstance.find(instance_uuid)
385 @template = @instance
389 def apply_parameters(params_args)
390 params_args.shift if params_args[0] == '--'
392 while !params_args.empty?
393 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
394 params[re[2]] = re[3]
396 elsif params_args.size > 1
397 param = params_args.shift.sub /^--/, ''
398 params[param] = params_args.shift
400 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
404 @components = @template[:components].dup
407 @components.each do |componentname, component|
408 component[:script_parameters].each do |parametername, parameter|
409 parameter = { :value => parameter } unless parameter.is_a? Hash
411 (params["#{componentname}::#{parametername}"] ||
413 (parameter[:output_of].nil? &&
414 (params[parametername.to_s] ||
415 parameter[:default])) ||
418 ![false,'false',0,'0'].index parameter[:required]
419 if parameter[:output_of]
422 errors << [componentname, parametername, "required parameter is missing"]
424 debuglog "parameter #{componentname}::#{parametername} == #{value}"
425 component[:script_parameters][parametername] = value
429 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
431 debuglog "options=" + @options.pretty_inspect
437 @instance ||= PipelineInstance.
438 create(:components => @components,
439 :pipeline_template_uuid => @template[:uuid],
442 @instance ||= PipelineInstance.
443 create(:components => @components,
444 :pipeline_template_uuid => @template[:uuid],
445 :state => 'RunningOnClient')
454 job_creation_failed = 0
457 @components.each do |cname, c|
459 owner_uuid = @instance[:owner_uuid]
460 # Is the job satisfying this component already known to be
461 # finished? (Already meaning "before we query API server about
462 # the job's current state")
463 c_already_finished = (c[:job] &&
465 !c[:job][:success].nil?)
467 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
468 # No job yet associated with this component and is component inputs
469 # are fully specified (any output_of script_parameters are resolved
471 job = JobCache.create({
472 :script => c[:script],
473 :script_parameters => c[:script_parameters],
474 :script_version => c[:script_version],
475 :repository => c[:repository],
476 :nondeterministic => c[:nondeterministic],
477 :output_is_persistent => c[:output_is_persistent] || false,
478 :runtime_constraints => c[:runtime_constraints],
479 :owner_uuid => owner_uuid,
481 # This is the right place to put these attributes when
482 # dealing with new API servers.
483 :minimum_script_version => c[:minimum_script_version],
484 :exclude_script_versions => c[:exclude_minimum_script_versions],
485 :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
486 :filters => c[:filters]
489 debuglog "component #{cname} new job #{job[:uuid]}"
492 debuglog "component #{cname} new job failed", 0
493 job_creation_failed += 1
497 if c[:job] and c[:job][:uuid]
498 if (c[:job][:running] or
499 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
500 # Job is running so update copy of job record
501 c[:job] = JobCache.get(c[:job][:uuid])
505 # Populate script_parameters of other components waiting for
507 @components.each do |c2name, c2|
508 c2[:script_parameters].each do |pname, p|
509 if p.is_a? Hash and p[:output_of] == cname.to_s
510 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
511 c2[:script_parameters][pname] = c[:job][:output]
516 unless c_already_finished
517 # This is my first time discovering that the job
518 # succeeded. (At the top of this loop, I was still
519 # waiting for it to finish.)
520 if c[:output_is_persistent]
521 # I need to make sure a resources/wants link is in
522 # place to protect the output from garbage
523 # collection. (Normally Crunch does this for me, but
524 # here I might be reusing the output of someone else's
525 # job and I need to make sure it's understood that the
526 # output is valuable to me, too.)
527 wanted = c[:job][:output]
528 debuglog "checking for existing persistence link for #{wanted}"
529 @my_user_uuid ||= $arv.user.current[:uuid]
530 links = $arv.link.list(limit: 1,
532 [%w(link_class = resources),
534 %w(tail_uuid =) + [@my_user_uuid],
535 %w(head_uuid =) + [wanted]
538 debuglog "link already exists, uuid #{links.first[:uuid]}"
540 newlink = $arv.link.create link: \
542 link_class: 'resources',
544 tail_kind: 'arvados#user',
545 tail_uuid: @my_user_uuid,
546 head_kind: 'arvados#collection',
548 owner_uuid: owner_uuid
550 debuglog "added link, uuid #{newlink[:uuid]}"
554 elsif c[:job][:running] ||
555 (!c[:job][:started_at] && !c[:job][:cancelled_at])
556 # Job is still running
558 elsif c[:job][:cancelled_at]
559 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
563 @instance[:components] = @components
566 if @options[:no_wait]
570 # If job creation fails, just give up on this pipeline instance.
571 if job_creation_failed > 0
579 debuglog "interrupt", 0
589 @components.each do |cname, c|
591 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
593 if c[:job][:success] == true
595 elsif c[:job][:success] == false or c[:job][:cancelled_at]
602 success = (succeeded == @components.length)
604 # A job create call failed. Just give up.
605 if job_creation_failed > 0
606 debuglog "job creation failed - giving up on this pipeline instance", 0
613 @instance[:state] = 'Complete'
615 @instance[:state] = 'Paused'
618 if ended == @components.length or failed > 0
619 @instance[:state] = success ? 'Complete' : 'Failed'
623 debuglog "pipeline instance state is #{@instance[:state]}"
625 # set components_summary
626 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
627 @instance[:components_summary] = components_summary
633 if @instance and @instance[:state] == 'RunningOnClient'
634 @instance[:state] = 'Paused'
648 if @options[:status_json] != '/dev/null'
649 File.open(@options[:status_json], 'w') do |f|
650 f.puts @components.pretty_inspect
654 if @options[:status_text] != '/dev/null'
655 File.open(@options[:status_text], 'w') do |f|
657 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
658 namewidth = @components.collect { |cname, c| cname.size }.max
659 @components.each do |cname, c|
660 jstatus = if !c[:job]
662 elsif c[:job][:running]
663 "#{c[:job][:tasks_summary].inspect}"
664 elsif c[:job][:success]
666 elsif c[:job][:cancelled_at]
667 "cancelled #{c[:job][:cancelled_at]}"
668 elsif c[:job][:finished_at]
669 "failed #{c[:job][:finished_at]}"
670 elsif c[:job][:started_at]
671 "started #{c[:job][:started_at]}"
673 "queued #{c[:job][:created_at]}"
675 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
682 runner = WhRunPipelineInstance.new($options)
684 if $options[:template]
685 runner.fetch_template($options[:template])
687 runner.fetch_instance($options[:instance])
689 runner.apply_parameters(p.leftovers)
690 runner.setup_instance
693 puts runner.instance[:uuid]
697 rescue Exception => e