5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 abort "#{$0}: syntax error: you must supply a --template or --instance."
193 if $options[:run_here] == $options[:submit]
194 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
197 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
200 def suppress_warnings
201 original_verbosity = $VERBOSE
204 $VERBOSE = original_verbosity
209 if ENV['ARVADOS_API_HOST_INSECURE']
210 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
213 # Set up the API client.
215 $client ||= Google::APIClient.
216 new(:host => $arvados_api_host,
217 :application_name => File.split($0).last,
218 :application_version => $application_version.to_s)
219 $arvados = $client.discovered_api('arvados', $arvados_api_version)
220 $arv = Arvados.new api_version: 'v1'
223 class PipelineInstance
225 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
229 :authenticated => false,
231 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
233 j = JSON.parse result.body, :symbolize_names => true
234 unless j.is_a? Hash and j[:uuid]
235 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
238 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
242 def self.create(attributes)
243 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
245 :pipeline_instance => attributes
247 :authenticated => false,
249 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
251 j = JSON.parse result.body, :symbolize_names => true
252 unless j.is_a? Hash and j[:uuid]
253 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
255 debuglog "Created pipeline instance: #{j[:uuid]}"
259 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
264 :pipeline_instance => @attributes_to_update.to_json
266 :authenticated => false,
268 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
270 j = JSON.parse result.body, :symbolize_names => true
271 unless j.is_a? Hash and j[:uuid]
272 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
275 @attributes_to_update = {}
280 @attributes_to_update[x] = y
288 @attributes_to_update = {}
296 result = $client.execute(:api_method => $arvados.jobs.get,
300 :authenticated => false,
302 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
304 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
306 def self.where(conditions)
307 result = $client.execute(:api_method => $arvados.jobs.list,
310 :where => conditions.to_json
312 :authenticated => false,
314 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
316 list = JSON.parse result.body, :symbolize_names => true
317 if list and list[:items].is_a? Array
323 def self.create(job, create_params)
325 result = $client.execute(:api_method => $arvados.jobs.create,
328 }.merge(create_params),
329 :authenticated => false,
331 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
333 j = JSON.parse result.body, :symbolize_names => true
334 if j.is_a? Hash and j[:uuid]
337 debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
343 class WhRunPipelineInstance
344 attr_reader :instance
346 def initialize(_options)
350 def fetch_template(template)
351 if template.match /[^-0-9a-z]/
352 # Doesn't look like a uuid -- use it as a filename.
353 @template = JSON.parse File.read(template), :symbolize_names => true
354 if !@template[:components]
355 abort ("#{$0}: Template loaded from #{template} " +
356 "does not have a \"components\" key")
359 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
363 :authenticated => false,
365 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
367 @template = JSON.parse result.body, :symbolize_names => true
369 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
375 def fetch_instance(instance_uuid)
376 @instance = PipelineInstance.find(instance_uuid)
377 @template = @instance
381 def apply_parameters(params_args)
382 params_args.shift if params_args[0] == '--'
384 while !params_args.empty?
385 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
386 params[re[2]] = re[3]
388 elsif params_args.size > 1
389 param = params_args.shift.sub /^--/, ''
390 params[param] = params_args.shift
392 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
396 @components = @template[:components].dup
399 @components.each do |componentname, component|
400 component[:script_parameters].each do |parametername, parameter|
401 parameter = { :value => parameter } unless parameter.is_a? Hash
403 (params["#{componentname}::#{parametername}"] ||
405 (parameter[:output_of].nil? &&
406 (params[parametername.to_s] ||
407 parameter[:default])) ||
410 ![false,'false',0,'0'].index parameter[:required]
411 if parameter[:output_of]
414 errors << [componentname, parametername, "required parameter is missing"]
416 debuglog "parameter #{componentname}::#{parametername} == #{value}"
417 component[:script_parameters][parametername] = value
421 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
423 debuglog "options=" + @options.pretty_inspect
429 @instance ||= PipelineInstance.
430 create(:components => @components,
431 :pipeline_template_uuid => @template[:uuid],
434 @instance ||= PipelineInstance.
435 create(:components => @components,
436 :pipeline_template_uuid => @template[:uuid],
437 :state => 'RunningOnClient')
446 job_creation_failed = 0
449 @components.each do |cname, c|
451 owner_uuid = @instance[:owner_uuid]
452 # Is the job satisfying this component already known to be
453 # finished? (Already meaning "before we query API server about
454 # the job's current state")
455 c_already_finished = (c[:job] &&
457 !c[:job][:success].nil?)
459 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
460 # No job yet associated with this component and is component inputs
461 # are fully specified (any output_of script_parameters are resolved
463 job = JobCache.create({
464 :script => c[:script],
465 :script_parameters => c[:script_parameters],
466 :script_version => c[:script_version],
467 :repository => c[:repository],
468 :nondeterministic => c[:nondeterministic],
469 :output_is_persistent => c[:output_is_persistent] || false,
470 :owner_uuid => owner_uuid,
471 # TODO: Delete the following three attributes when
472 # supporting pre-20140418 API servers is no longer
473 # important. New API servers take these as flags that
474 # control behavior of create, rather than job attributes.
475 :minimum_script_version => c[:minimum_script_version],
476 :exclude_script_versions => c[:exclude_minimum_script_versions],
477 :no_reuse => @options[:no_reuse] || c[:nondeterministic],
479 # This is the right place to put these attributes when
480 # dealing with new API servers.
481 :minimum_script_version => c[:minimum_script_version],
482 :exclude_script_versions => c[:exclude_minimum_script_versions],
483 :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
486 debuglog "component #{cname} new job #{job[:uuid]}"
489 debuglog "component #{cname} new job failed", 0
490 job_creation_failed += 1
494 if c[:job] and c[:job][:uuid]
495 if (c[:job][:running] or
496 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
497 # Job is running so update copy of job record
498 c[:job] = JobCache.get(c[:job][:uuid])
502 # Populate script_parameters of other components waiting for
504 @components.each do |c2name, c2|
505 c2[:script_parameters].each do |pname, p|
506 if p.is_a? Hash and p[:output_of] == cname.to_s
507 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
508 c2[:script_parameters][pname] = c[:job][:output]
513 unless c_already_finished
514 # This is my first time discovering that the job
515 # succeeded. (At the top of this loop, I was still
516 # waiting for it to finish.)
517 if c[:output_is_persistent]
518 # I need to make sure a resources/wants link is in
519 # place to protect the output from garbage
520 # collection. (Normally Crunch does this for me, but
521 # here I might be reusing the output of someone else's
522 # job and I need to make sure it's understood that the
523 # output is valuable to me, too.)
524 wanted = c[:job][:output]
525 debuglog "checking for existing persistence link for #{wanted}"
526 @my_user_uuid ||= $arv.user.current[:uuid]
527 links = $arv.link.list(limit: 1,
529 [%w(link_class = resources),
531 %w(tail_uuid =) + [@my_user_uuid],
532 %w(head_uuid =) + [wanted]
535 debuglog "link already exists, uuid #{links.first[:uuid]}"
537 newlink = $arv.link.create link: \
539 link_class: 'resources',
541 tail_kind: 'arvados#user',
542 tail_uuid: @my_user_uuid,
543 head_kind: 'arvados#collection',
545 owner_uuid: owner_uuid
547 debuglog "added link, uuid #{newlink[:uuid]}"
551 elsif c[:job][:running] ||
552 (!c[:job][:started_at] && !c[:job][:cancelled_at])
553 # Job is still running
555 elsif c[:job][:cancelled_at]
556 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
560 @instance[:components] = @components
563 if @options[:no_wait]
567 # If job creation fails, just give up on this pipeline instance.
568 if job_creation_failed > 0
576 debuglog "interrupt", 0
586 @components.each do |cname, c|
588 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
590 if c[:job][:success] == true
592 elsif c[:job][:success] == false
599 success = (succeeded == @components.length)
601 # A job create call failed. Just give up.
602 if job_creation_failed > 0
603 debuglog "job creation failed - giving up on this pipeline instance", 0
610 @instance[:state] = 'Complete'
612 @instance[:state] = 'Paused'
615 if ended == @components.length or failed > 0
616 @instance[:state] = success ? 'Complete' : 'Failed'
620 debuglog "pipeline instance state is #{@instance[:state]}"
622 # set components_summary
623 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
624 @instance[:components_summary] = components_summary
630 if @instance and @instance[:state] == 'RunningOnClient'
631 @instance[:state] = 'Paused'
645 if @options[:status_json] != '/dev/null'
646 File.open(@options[:status_json], 'w') do |f|
647 f.puts @components.pretty_inspect
651 if @options[:status_text] != '/dev/null'
652 File.open(@options[:status_text], 'w') do |f|
654 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
655 namewidth = @components.collect { |cname, c| cname.size }.max
656 @components.each do |cname, c|
657 jstatus = if !c[:job]
659 elsif c[:job][:running]
660 "#{c[:job][:tasks_summary].inspect}"
661 elsif c[:job][:success]
663 elsif c[:job][:cancelled_at]
664 "cancelled #{c[:job][:cancelled_at]}"
665 elsif c[:job][:finished_at]
666 "failed #{c[:job][:finished_at]}"
667 elsif c[:job][:started_at]
668 "started #{c[:job][:started_at]}"
670 "queued #{c[:job][:created_at]}"
672 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
679 runner = WhRunPipelineInstance.new($options)
681 if $options[:template]
682 runner.fetch_template($options[:template])
684 runner.fetch_instance($options[:instance])
686 runner.apply_parameters(p.leftovers)
687 runner.setup_instance
690 puts runner.instance[:uuid]
694 rescue Exception => e