Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / python / arvados / commands / ws.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import json
7 import logging
8 import signal
9 import sys
10
11 import arvados
12 from arvados.events import subscribe
13 from arvados._version import __version__
14 from . import _util as arv_cmd
15
16 def main(arguments=None):
17     logger = logging.getLogger('arvados.arv-ws')
18
19     parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
20     parser.add_argument('--version', action='version',
21                         version="%s %s" % (sys.argv[0], __version__),
22                         help='Print version and exit.')
23     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
24     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
25     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
26     parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.")
27
28     group = parser.add_mutually_exclusive_group()
29     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
30     group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail")
31
32     group = parser.add_mutually_exclusive_group()
33     group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
34     group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
35
36     args = parser.parse_args(arguments)
37
38     global filters
39     global known_component_jobs
40     global ws
41
42     filters = []
43     known_component_jobs = set()
44     ws = None
45
46     def update_subscribed_components(components):
47         global known_component_jobs
48         global filters
49         pipeline_jobs = set()
50         for c in components:
51             if "job" in components[c]:
52                 pipeline_jobs.add(components[c]["job"]["uuid"])
53         if known_component_jobs != pipeline_jobs:
54             new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
55             ws.subscribe(new_filters)
56             ws.unsubscribe(filters)
57             filters = new_filters
58             known_component_jobs = pipeline_jobs
59
60     api = arvados.api('v1', num_retries=args.retries)
61
62     if args.uuid:
63         filters += [ ['object_uuid', '=', args.uuid] ]
64
65     if args.filters:
66         filters += json.loads(args.filters)
67
68     if args.job:
69         filters += [ ['object_uuid', '=', args.job] ]
70
71     if args.pipeline:
72         filters += [ ['object_uuid', '=', args.pipeline] ]
73
74     if args.start_time:
75         last_log_id = 1
76         filters += [ ['created_at', '>=', args.start_time] ]
77     else:
78         last_log_id = None
79
80     if args.id:
81         last_log_id = args.id-1
82
83     def on_message(ev):
84         global filters
85         global ws
86
87         logger.debug(ev)
88         if 'event_type' in ev and (args.pipeline or args.job):
89             if ev['event_type'] in ('stderr', 'stdout'):
90                 sys.stdout.write(ev["properties"]["text"])
91             elif ev["event_type"] in ("create", "update"):
92                 if ev["object_kind"] == "arvados#pipelineInstance":
93                     c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
94                     update_subscribed_components(c["components"])
95
96                 if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
97                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):
98                         ws.close()
99
100                 if ev["object_kind"] == "arvados#job" and args.job:
101                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
102                         ws.close()
103         elif 'status' in ev and ev['status'] == 200:
104             pass
105         else:
106             print(json.dumps(ev))
107
108     try:
109         ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=last_log_id)
110         if ws:
111             if args.pipeline:
112                 c = api.pipeline_instances().get(uuid=args.pipeline).execute()
113                 update_subscribed_components(c["components"])
114                 if c["state"] in ("Complete", "Failed", "Paused"):
115                     ws.close()
116             ws.run_forever()
117     except KeyboardInterrupt:
118         pass
119     except Exception as e:
120         logger.error(e)
121     finally:
122         if ws:
123             ws.close()