5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 # pipeline components. Always submit a new job
33 # or use an existing job which has not yet
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 # components. Submit a new job for every component.
39 # [--debug] Print extra debugging information on stderr.
41 # [--debug-level N] Increase amount of debugging information. Default
42 # 1, possible range 0..3.
44 # [--status-text path] Print plain text status report to a file or
45 # fifo. Default: /dev/stdout
47 # [--status-json path] Print JSON status report to a file or
48 # fifo. Default: /dev/null
52 # [param_name=param_value]
54 # [param_name param_value] Set (or override) the default value for
55 # every parameter with the given name.
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 # parameter for a single
64 class WhRunPipelineInstance
67 $application_version = 1.0
69 if RUBY_VERSION < '1.9.3' then
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
87 require 'google/api_client'
91 #{$0}: fatal: #{l.message}
92 Some runtime dependencies may be missing.
93 Try: gem install arvados pp google-api-client json trollop
97 def debuglog(message, verbosity=1)
98 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
102 def suppress_warnings
103 original_verbosity = $VERBOSE
106 $VERBOSE = original_verbosity
111 if $arvados_api_host.match /local/
112 # You probably don't care about SSL certificate checks if you're
113 # testing with a dev server.
114 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
117 class Google::APIClient
118 def discovery_document(api, version)
120 return @discovery_documents["#{api}:#{version}"] ||=
122 response = self.execute!(
123 :http_method => :get,
124 :uri => self.discovery_uri(api, version),
125 :authenticated => false
127 response.body.class == String ? JSON.parse(response.body) : response.body
133 # Parse command line options (the kind that control the behavior of
134 # this program, that is, not the pipeline component parameters).
136 p = Trollop::Parser.new do
139 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
143 "Store plain text status in given file.",
146 :default => '/dev/stdout')
148 "Store json-formatted pipeline in given file.",
151 :default => '/dev/null')
153 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
156 opt(:no_reuse_finished,
157 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
161 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
165 "Print extra debugging information on stderr.",
168 "Set debug verbosity level.",
172 "UUID of pipeline template, or path to local pipeline template file.",
176 "UUID of pipeline instance.",
180 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
184 "Manage the pipeline in process.",
189 $options = Trollop::with_standard_exception_handling p do
192 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
194 if $options[:instance]
195 if $options[:template] or $options[:submit]
196 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
198 elsif not $options[:template]
199 abort "#{$0}: syntax error: you must supply a --template or --instance."
202 if $options[:run_here] == $options[:submit]
203 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
206 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
209 def suppress_warnings
210 original_verbosity = $VERBOSE
213 $VERBOSE = original_verbosity
218 if ENV['ARVADOS_API_HOST_INSECURE']
219 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
222 # Set up the API client.
224 $client ||= Google::APIClient.
225 new(:host => $arvados_api_host,
226 :application_name => File.split($0).last,
227 :application_version => $application_version.to_s)
228 $arvados = $client.discovered_api('arvados', $arvados_api_version)
229 $arv = Arvados.new api_version: 'v1'
232 class PipelineInstance
234 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
239 :api_token => ENV['ARVADOS_API_TOKEN']
241 :authenticated => false)
242 j = JSON.parse result.body, :symbolize_names => true
243 unless j.is_a? Hash and j[:uuid]
244 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
247 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
251 def self.create(attributes)
252 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
254 :api_token => ENV['ARVADOS_API_TOKEN'],
255 :pipeline_instance => attributes
257 :authenticated => false)
258 j = JSON.parse result.body, :symbolize_names => true
259 unless j.is_a? Hash and j[:uuid]
260 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
262 debuglog "Created pipeline instance: #{j[:uuid]}"
266 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
271 :api_token => ENV['ARVADOS_API_TOKEN'],
272 :pipeline_instance => @attributes_to_update.to_json
274 :authenticated => false)
275 j = JSON.parse result.body, :symbolize_names => true
276 unless j.is_a? Hash and j[:uuid]
277 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
280 @attributes_to_update = {}
285 @attributes_to_update[x] = y
293 @attributes_to_update = {}
301 result = $client.execute(:api_method => $arvados.jobs.get,
303 :api_token => ENV['ARVADOS_API_TOKEN'],
306 :authenticated => false)
307 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
309 def self.where(conditions)
310 result = $client.execute(:api_method => $arvados.jobs.list,
312 :api_token => ENV['ARVADOS_API_TOKEN'],
314 :where => conditions.to_json
316 :authenticated => false)
317 list = JSON.parse result.body, :symbolize_names => true
318 if list and list[:items].is_a? Array
324 def self.create(attributes)
326 result = $client.execute(:api_method => $arvados.jobs.create,
328 :api_token => ENV['ARVADOS_API_TOKEN'],
329 :job => attributes.to_json
331 :authenticated => false)
332 j = JSON.parse result.body, :symbolize_names => true
333 if j.is_a? Hash and j[:uuid]
336 debuglog "create job: #{j[:errors] rescue nil}", 0
342 class WhRunPipelineInstance
343 attr_reader :instance
345 def initialize(_options)
349 def fetch_template(template)
350 if template.match /[^-0-9a-z]/
351 # Doesn't look like a uuid -- use it as a filename.
352 @template = JSON.parse File.read(template), :symbolize_names => true
353 if !@template[:components]
354 abort ("#{$0}: Template loaded from #{template} " +
355 "does not have a \"components\" key")
358 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
360 :api_token => ENV['ARVADOS_API_TOKEN'],
363 :authenticated => false)
364 @template = JSON.parse result.body, :symbolize_names => true
366 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
372 def fetch_instance(instance_uuid)
373 @instance = PipelineInstance.find(instance_uuid)
374 @template = @instance
378 def apply_parameters(params_args)
379 params_args.shift if params_args[0] == '--'
381 while !params_args.empty?
382 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
383 params[re[2]] = re[3]
385 elsif params_args.size > 1
386 param = params_args.shift.sub /^--/, ''
387 params[param] = params_args.shift
389 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
393 @components = @template[:components].dup
396 @components.each do |componentname, component|
397 component[:script_parameters].each do |parametername, parameter|
398 parameter = { :value => parameter } unless parameter.is_a? Hash
400 (params["#{componentname}::#{parametername}"] ||
402 (parameter[:output_of].nil? &&
403 (params[parametername.to_s] ||
404 parameter[:default])) ||
407 ![false,'false',0,'0'].index parameter[:required]
408 if parameter[:output_of]
411 errors << [componentname, parametername, "required parameter is missing"]
413 debuglog "parameter #{componentname}::#{parametername} == #{value}"
414 component[:script_parameters][parametername] = value
418 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
420 debuglog "options=" + @options.pretty_inspect
425 @instance ||= PipelineInstance.
426 create(:components => @components,
427 :pipeline_template_uuid => @template[:uuid],
436 @components.each do |cname, c|
438 c_already_finished = (c[:job] &&
440 !c[:job][:success].nil?)
442 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
443 # Job is fully specified (all parameter values are present) but
444 # no particular job has been found.
446 debuglog "component #{cname} ready to satisfy."
449 second_place_job = nil # satisfies component, but not finished yet
451 (@options[:no_reuse] ? [] : JobCache.
452 where(script: c[:script],
453 script_parameters: c[:script_parameters],
454 script_version_descends_from: c[:script_version])
455 ).each do |candidate_job|
456 candidate_params_downcase = Hash[candidate_job[:script_parameters].
457 map { |k,v| [k.downcase,v] }]
458 c_params_downcase = Hash[c[:script_parameters].
459 map { |k,v| [k.downcase,v] }]
461 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
463 unless candidate_params_downcase == c_params_downcase
467 if c[:script_version] !=
468 candidate_job[:script_version][0,c[:script_version].length]
469 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
473 unless candidate_job[:success] || candidate_job[:running] ||
474 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
475 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
479 if candidate_job[:success]
480 unless @options[:no_reuse_finished]
482 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
486 second_place_job ||= candidate_job
490 if not c[:job] and second_place_job
491 job = second_place_job
492 $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
496 debuglog "component #{cname} not satisfied by any existing job."
497 if !@options[:dry_run]
498 debuglog "component #{cname} new job."
499 job = JobCache.create(:script => c[:script],
500 :script_parameters => c[:script_parameters],
501 :runtime_constraints => c[:runtime_constraints] || {},
502 :script_version => c[:script_version] || 'master',
503 :output_is_persistent => c[:output_is_persistent] || false)
505 debuglog "component #{cname} new job #{job[:uuid]}"
508 debuglog "component #{cname} new job failed"
515 if c[:job] and c[:job][:uuid]
516 if (c[:job][:running] or
517 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
518 c[:job] = JobCache.get(c[:job][:uuid])
521 # Populate script_parameters of other components waiting for
523 @components.each do |c2name, c2|
524 c2[:script_parameters].each do |pname, p|
525 if p.is_a? Hash and p[:output_of] == cname.to_s
526 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
527 c2[:script_parameters][pname] = c[:job][:output]
532 unless c_already_finished
533 if c[:output_is_persistent]
534 # This is my first time discovering that the job
535 # succeeded. I need to make sure a resources/wants
536 # link is in place to protect the output from garbage
537 # collection. (Normally Crunch does this for me, but
538 # here I might be reusing the output of someone else's
539 # job and I need to make sure it's understood that the
540 # output is valuable to me, too.)
541 wanted = c[:job][:output]
542 debuglog "checking for existing persistence link for #{wanted}"
543 @my_user_uuid ||= $arv.user.current[:uuid]
544 links = $arv.link.list(limit: 1,
546 [%w(link_class = resources),
548 %w(tail_uuid =) + [@my_user_uuid],
549 %w(head_uuid =) + [wanted]
552 debuglog "link already exists, uuid #{links.first[:uuid]}"
554 newlink = $arv.link.create link: \
556 link_class: 'resources',
558 tail_kind: 'arvados#user',
559 tail_uuid: @my_user_uuid,
560 head_kind: 'arvados#collection',
563 debuglog "added link, uuid #{newlink[:uuid]}"
567 elsif c[:job][:running] ||
568 (!c[:job][:started_at] && !c[:job][:cancelled_at])
570 elsif c[:job][:cancelled_at]
571 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
575 @instance[:components] = @components
576 @instance[:active] = moretodo
579 if @options[:no_wait]
587 debuglog "interrupt", 0
596 @components.each do |cname, c|
598 if c[:job][:finished_at]
600 if c[:job][:success] == true
602 elsif c[:job][:success] == false
609 if ended == @components.length or failed > 0
610 @instance[:active] = false
611 @instance[:success] = (succeeded == @components.length)
619 @instance[:active] = false
633 if @options[:status_json] != '/dev/null'
634 File.open(@options[:status_json], 'w') do |f|
635 f.puts @components.pretty_inspect
639 if @options[:status_text] != '/dev/null'
640 File.open(@options[:status_text], 'w') do |f|
642 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
643 namewidth = @components.collect { |cname, c| cname.size }.max
644 @components.each do |cname, c|
645 jstatus = if !c[:job]
647 elsif c[:job][:running]
648 "#{c[:job][:tasks_summary].inspect}"
649 elsif c[:job][:success]
651 elsif c[:job][:cancelled_at]
652 "cancelled #{c[:job][:cancelled_at]}"
653 elsif c[:job][:finished_at]
654 "failed #{c[:job][:finished_at]}"
655 elsif c[:job][:started_at]
656 "started #{c[:job][:started_at]}"
658 "queued #{c[:job][:created_at]}"
660 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
667 runner = WhRunPipelineInstance.new($options)
669 if $options[:template]
670 runner.fetch_template($options[:template])
672 runner.fetch_instance($options[:instance])
674 runner.apply_parameters(p.leftovers)
675 runner.setup_instance
678 puts runner.instance[:uuid]
682 rescue Exception => e