5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 abort "#{$0}: syntax error: you must supply a --template or --instance."
193 if $options[:run_here] == $options[:submit]
194 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
197 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
200 def suppress_warnings
201 original_verbosity = $VERBOSE
204 $VERBOSE = original_verbosity
209 if ENV['ARVADOS_API_HOST_INSECURE']
210 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
213 # Set up the API client.
215 $client ||= Google::APIClient.
216 new(:host => $arvados_api_host,
217 :application_name => File.split($0).last,
218 :application_version => $application_version.to_s)
219 $arvados = $client.discovered_api('arvados', $arvados_api_version)
220 $arv = Arvados.new api_version: 'v1'
223 class PipelineInstance
225 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
230 :api_token => ENV['ARVADOS_API_TOKEN']
232 :authenticated => false)
233 j = JSON.parse result.body, :symbolize_names => true
234 unless j.is_a? Hash and j[:uuid]
235 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
238 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
242 def self.create(attributes)
243 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
245 :api_token => ENV['ARVADOS_API_TOKEN'],
246 :pipeline_instance => attributes
248 :authenticated => false)
249 j = JSON.parse result.body, :symbolize_names => true
250 unless j.is_a? Hash and j[:uuid]
251 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
253 debuglog "Created pipeline instance: #{j[:uuid]}"
257 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
262 :api_token => ENV['ARVADOS_API_TOKEN'],
263 :pipeline_instance => @attributes_to_update.to_json
265 :authenticated => false)
266 j = JSON.parse result.body, :symbolize_names => true
267 unless j.is_a? Hash and j[:uuid]
268 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
271 @attributes_to_update = {}
276 @attributes_to_update[x] = y
284 @attributes_to_update = {}
292 result = $client.execute(:api_method => $arvados.jobs.get,
294 :api_token => ENV['ARVADOS_API_TOKEN'],
297 :authenticated => false)
298 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
300 def self.where(conditions)
301 result = $client.execute(:api_method => $arvados.jobs.list,
303 :api_token => ENV['ARVADOS_API_TOKEN'],
305 :where => conditions.to_json
307 :authenticated => false)
308 list = JSON.parse result.body, :symbolize_names => true
309 if list and list[:items].is_a? Array
315 def self.create(attributes)
317 result = $client.execute(:api_method => $arvados.jobs.create,
319 :api_token => ENV['ARVADOS_API_TOKEN'],
320 :job => attributes.to_json
322 :authenticated => false)
323 j = JSON.parse result.body, :symbolize_names => true
324 if j.is_a? Hash and j[:uuid]
327 debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
333 class WhRunPipelineInstance
334 attr_reader :instance
336 def initialize(_options)
340 def fetch_template(template)
341 if template.match /[^-0-9a-z]/
342 # Doesn't look like a uuid -- use it as a filename.
343 @template = JSON.parse File.read(template), :symbolize_names => true
344 if !@template[:components]
345 abort ("#{$0}: Template loaded from #{template} " +
346 "does not have a \"components\" key")
349 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
351 :api_token => ENV['ARVADOS_API_TOKEN'],
354 :authenticated => false)
355 @template = JSON.parse result.body, :symbolize_names => true
357 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
363 def fetch_instance(instance_uuid)
364 @instance = PipelineInstance.find(instance_uuid)
365 @template = @instance
369 def apply_parameters(params_args)
370 params_args.shift if params_args[0] == '--'
372 while !params_args.empty?
373 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
374 params[re[2]] = re[3]
376 elsif params_args.size > 1
377 param = params_args.shift.sub /^--/, ''
378 params[param] = params_args.shift
380 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
384 @components = @template[:components].dup
387 @components.each do |componentname, component|
388 component[:script_parameters].each do |parametername, parameter|
389 parameter = { :value => parameter } unless parameter.is_a? Hash
391 (params["#{componentname}::#{parametername}"] ||
393 (parameter[:output_of].nil? &&
394 (params[parametername.to_s] ||
395 parameter[:default])) ||
398 ![false,'false',0,'0'].index parameter[:required]
399 if parameter[:output_of]
402 errors << [componentname, parametername, "required parameter is missing"]
404 debuglog "parameter #{componentname}::#{parametername} == #{value}"
405 component[:script_parameters][parametername] = value
409 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
411 debuglog "options=" + @options.pretty_inspect
416 @instance ||= PipelineInstance.
417 create(:components => @components,
418 :pipeline_template_uuid => @template[:uuid],
427 @components.each do |cname, c|
429 # Is the job satisfying this component already known to be
430 # finished? (Already meaning "before we query API server about
431 # the job's current state")
432 c_already_finished = (c[:job] &&
434 !c[:job][:success].nil?)
436 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
437 # No job yet associated with this component and is component inputs
438 # are fully specified (any output_of script_parameters are resolved
440 job = JobCache.create({:script => c[:script],
441 :script_parameters => c[:script_parameters],
442 :script_version => c[:script_version],
443 :repository => c[:repository],
444 :minimum_script_version => c[:minimum_script_version],
445 :exclude_script_versions => c[:exclude_minimum_script_versions],
446 :nondeterministic => c[:nondeterministic],
447 :no_reuse => @options[:no_reuse],
448 :output_is_persistent => c[:output_is_persistent] || false})
450 debuglog "component #{cname} new job #{job[:uuid]}"
453 debuglog "component #{cname} new job failed"
457 if c[:job] and c[:job][:uuid]
458 if (c[:job][:running] or
459 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
460 # Job is running so update copy of job record
461 c[:job] = JobCache.get(c[:job][:uuid])
465 # Populate script_parameters of other components waiting for
467 @components.each do |c2name, c2|
468 c2[:script_parameters].each do |pname, p|
469 if p.is_a? Hash and p[:output_of] == cname.to_s
470 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
471 c2[:script_parameters][pname] = c[:job][:output]
476 unless c_already_finished
477 # This is my first time discovering that the job
478 # succeeded. (At the top of this loop, I was still
479 # waiting for it to finish.)
480 if c[:output_is_persistent]
481 # I need to make sure a resources/wants link is in
482 # place to protect the output from garbage
483 # collection. (Normally Crunch does this for me, but
484 # here I might be reusing the output of someone else's
485 # job and I need to make sure it's understood that the
486 # output is valuable to me, too.)
487 wanted = c[:job][:output]
488 debuglog "checking for existing persistence link for #{wanted}"
489 @my_user_uuid ||= $arv.user.current[:uuid]
490 links = $arv.link.list(limit: 1,
492 [%w(link_class = resources),
494 %w(tail_uuid =) + [@my_user_uuid],
495 %w(head_uuid =) + [wanted]
498 debuglog "link already exists, uuid #{links.first[:uuid]}"
500 newlink = $arv.link.create link: \
502 link_class: 'resources',
504 tail_kind: 'arvados#user',
505 tail_uuid: @my_user_uuid,
506 head_kind: 'arvados#collection',
509 debuglog "added link, uuid #{newlink[:uuid]}"
513 elsif c[:job][:running] ||
514 (!c[:job][:started_at] && !c[:job][:cancelled_at])
515 # Job is still running
517 elsif c[:job][:cancelled_at]
518 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
522 @instance[:components] = @components
523 @instance[:active] = moretodo
526 if @options[:no_wait]
534 debuglog "interrupt", 0
543 @components.each do |cname, c|
545 if c[:job][:finished_at]
547 if c[:job][:success] == true
549 elsif c[:job][:success] == false
556 if ended == @components.length or failed > 0
557 @instance[:active] = false
558 @instance[:success] = (succeeded == @components.length)
566 @instance[:active] = false
580 if @options[:status_json] != '/dev/null'
581 File.open(@options[:status_json], 'w') do |f|
582 f.puts @components.pretty_inspect
586 if @options[:status_text] != '/dev/null'
587 File.open(@options[:status_text], 'w') do |f|
589 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
590 namewidth = @components.collect { |cname, c| cname.size }.max
591 @components.each do |cname, c|
592 jstatus = if !c[:job]
594 elsif c[:job][:running]
595 "#{c[:job][:tasks_summary].inspect}"
596 elsif c[:job][:success]
598 elsif c[:job][:cancelled_at]
599 "cancelled #{c[:job][:cancelled_at]}"
600 elsif c[:job][:finished_at]
601 "failed #{c[:job][:finished_at]}"
602 elsif c[:job][:started_at]
603 "started #{c[:job][:started_at]}"
605 "queued #{c[:job][:created_at]}"
607 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
614 runner = WhRunPipelineInstance.new($options)
616 if $options[:template]
617 runner.fetch_template($options[:template])
619 runner.fetch_instance($options[:instance])
621 runner.apply_parameters(p.leftovers)
622 runner.setup_instance
625 puts runner.instance[:uuid]
629 rescue Exception => e