STDSTREAM_PATH = pathlib.Path('-')
TITLECASE = operator.methodcaller('title')
+_ALIASED_METHODS = frozenset([
+ 'destroy',
+ 'index',
+ 'show',
+])
+_DEPRECATED_NOTICE = '''
+
+!!! deprecated
+ This resource is deprecated in the Arvados API.
+'''
+_DEPRECATED_RESOURCES = frozenset([
+ 'Humans',
+ 'JobTasks',
+ 'Jobs',
+ 'KeepDisks',
+ 'Nodes',
+ 'PipelineInstances',
+ 'PipelineTemplates',
+ 'Specimens'
+ 'Traits',
+])
+_DEPRECATED_SCHEMAS = frozenset([
+ *(name[:-1] for name in _DEPRECATED_RESOURCES),
+ *(f'{name[:-1]}List' for name in _DEPRECATED_RESOURCES),
+])
+
+_LIST_PYDOC = '''
+
+This is the dictionary object returned when you call `{cls_name}s.list`.
+If you just want to iterate all objects that match your search criteria,
+consider using `arvados.util.keyset_list_all`.
+If you work with this raw object, the keys of the dictionary are documented
+below, along with their types. The `items` key maps to a list of matching
+`{cls_name}` objects.
+'''
+_MODULE_PYDOC = '''Arvados API client documentation skeleton
+
+This module documents the methods and return types provided by the Arvados API
+client. Start with `ArvadosAPIClient`, which documents the methods available
+from the API client objects constructed by `arvados.api`. The implementation is
+generated dynamically at runtime when the client object is built.
+'''
+_SCHEMA_PYDOC = '''
+
+This is the dictionary object that represents a single {cls_name} in Arvados
+and is returned by most `{cls_name}s` methods.
+The keys of the dictionary are documented below, along with their types.
+Not every key may appear in every dictionary returned by an API call.
+When a method doesn't return all the data, you can use its `select` parameter
+to list the specific keys you need. Refer to the API documentation for details.
+'''
+
+_MODULE_PRELUDE = '''
+import sys
+if sys.version_info < (3, 8):
+ from typing import Any
+ from typing_extensions import TypedDict
+else:
+ from typing import Any, TypedDict
+'''
+
+_TYPE_MAP = {
+ # Map the API's JavaScript-based type names to Python annotations.
+ # Some of these may disappear after Arvados issue #19795 is fixed.
+ 'Array': 'list',
+ 'array': 'list',
+ 'boolean': 'bool',
+ # datetime fields are strings in ISO 8601 format.
+ 'datetime': 'str',
+ 'Hash': 'dict[str, Any]',
+ 'integer': 'int',
+ 'object': 'dict[str, Any]',
+ 'string': 'str',
+ 'text': 'str',
+}
+
+def get_type_annotation(name: str) -> str:
+ return _TYPE_MAP.get(name, name)
+
+def to_docstring(s: str, indent: int) -> str:
+ prefix = ' ' * indent
+ s = s.replace('"""', '""\"')
+ s = re.sub(r'(\n+)', r'\1' + prefix, s)
+ s = s.strip()
+ if '\n' in s:
+ return f'{prefix}"""{s}\n{prefix}"""'
+ else:
+ return f'{prefix}"""{s}"""'
+
def transform_name(s: str, sep: str, fix_part: Callable[[str], str]) -> str:
return sep.join(fix_part(part) for part in s.split('_'))
return transform_name(s, ' ', LOWERCASE)
class Parameter(inspect.Parameter):
- _TYPE_MAP = {
- # Map the API's JavaScript-based type names to Python annotations
- 'array': 'list',
- 'boolean': 'bool',
- 'integer': 'int',
- 'object': 'dict[str, Any]',
- 'string': 'str',
- }
-
def __init__(self, name: str, spec: Mapping[str, Any]) -> None:
self.api_name = name
self._spec = spec
super().__init__(
name,
inspect.Parameter.KEYWORD_ONLY,
- annotation=self.annotation_from_type(),
+ annotation=get_type_annotation(self._spec['type']),
# In normal Python the presence of a default tells you whether or
# not an argument is required. In the API the `required` flag tells
# us that, and defaults are specified inconsistently. Don't show
default=inspect.Parameter.empty,
)
- def annotation_from_type(self) -> str:
- src_type = self._spec['type']
- return self._TYPE_MAP.get(src_type, src_type)
-
def default_value(self) -> object:
try:
src_value: str = self._spec['default']
# parsers retain the definition list structure.
description = self._spec['description'] or '\u200b'
return f'''
- {self.api_name}: {self.annotation}
- : {description}{default_doc}
+{self.api_name}: {self.annotation}
+: {description}{default_doc}
'''
def signature(self) -> inspect.Signature:
parameters = [
- inspect.Parameter('self', inspect.Parameter.POSITIONAL_ONLY),
+ inspect.Parameter('self', inspect.Parameter.POSITIONAL_OR_KEYWORD),
*self._required_params,
*self._optional_params,
]
- return inspect.Signature(parameters, return_annotation='dict[str, Any]')
+ try:
+ returns = get_type_annotation(self._spec['response']['$ref'])
+ except KeyError:
+ returns = 'dict[str, Any]'
+ return inspect.Signature(parameters, return_annotation=returns)
- def doc(self) -> str:
- return re.sub(r'\n{3,}', '\n\n', f'''
+ def doc(self, doc_slice: slice=slice(None)) -> str:
+ doc_lines = self._spec['description'].splitlines(keepends=True)[doc_slice]
+ if not doc_lines[-1].endswith('\n'):
+ doc_lines.append('\n')
+ if self._required_params:
+ doc_lines.append("\nRequired parameters:\n")
+ doc_lines.extend(param.doc() for param in self._required_params)
+ if self._optional_params:
+ doc_lines.append("\nOptional parameters:\n")
+ doc_lines.extend(param.doc() for param in self._optional_params)
+ return f'''
def {self.name}{self.signature()}:
- """{self._spec['description'].splitlines()[0]}
-
-{" Required parameters:" if self._required_params else ""}
+{to_docstring(''.join(doc_lines), 8)}
+'''
-{''.join(param.doc() for param in self._required_params)}
-{" Optional parameters:" if self._optional_params else ""}
+def document_schema(name: str, spec: Mapping[str, Any]) -> str:
+ description = spec['description']
+ if name in _DEPRECATED_SCHEMAS:
+ description += _DEPRECATED_NOTICE
+ if name.endswith('List'):
+ desc_fmt = _LIST_PYDOC
+ cls_name = name[:-4]
+ else:
+ desc_fmt = _SCHEMA_PYDOC
+ cls_name = name
+ description += desc_fmt.format(cls_name=cls_name)
+ lines = [
+ f"class {name}(TypedDict, total=False):",
+ to_docstring(description, 4),
+ ]
+ for field_name, field_spec in spec['properties'].items():
+ field_type = get_type_annotation(field_spec['type'])
+ try:
+ subtype = field_spec['items']['$ref']
+ except KeyError:
+ pass
+ else:
+ field_type += f"[{get_type_annotation(subtype)}]"
-{''.join(param.doc() for param in self._optional_params)}
- """
-''')
+ field_line = f" {field_name}: {field_type!r}"
+ try:
+ field_line += f" = {field_spec['default']!r}"
+ except KeyError:
+ pass
+ lines.append(field_line)
+ field_doc: str = field_spec.get('description', '')
+ if field_spec['type'] == 'datetime':
+ field_doc += "\n\nString in ISO 8601 datetime format. Pass it to `ciso8601.parse_datetime` to build a `datetime.datetime`."
+ if field_doc:
+ lines.append(to_docstring(field_doc, 4))
+ lines.append('\n')
+ return '\n'.join(lines)
def document_resource(name: str, spec: Mapping[str, Any]) -> str:
- methods = [Method(key, meth_spec) for key, meth_spec in spec['methods'].items()]
- return f'''class {classify_name(name)}:
- """Methods to query and manipulate Arvados {humanize_name(name)}"""
-{''.join(method.doc() for method in sorted(methods, key=NAME_KEY))}
+ class_name = classify_name(name)
+ docstring = f"Methods to query and manipulate Arvados {humanize_name(name)}"
+ if class_name in _DEPRECATED_RESOURCES:
+ docstring += _DEPRECATED_NOTICE
+ methods = [
+ Method(key, meth_spec)
+ for key, meth_spec in spec['methods'].items()
+ if key not in _ALIASED_METHODS
+ ]
+ return f'''class {class_name}:
+{to_docstring(docstring, 4)}
+{''.join(method.doc(slice(1)) for method in sorted(methods, key=NAME_KEY))}
'''
def parse_arguments(arglist: Optional[Sequence[str]]) -> argparse.Namespace:
else:
parts = urllib.parse.urlsplit(args.discovery_url)
if not (parts.scheme or parts.netloc):
- args.discovery_url = urllib.parse.urlunsplit(parts._replace(scheme='file'))
+ args.discovery_url = pathlib.Path(args.discovery_url).resolve().as_uri()
+ # Our output is Python source, so it should be UTF-8 regardless of locale.
if args.output_file == STDSTREAM_PATH:
- args.out_file = sys.stdout
+ args.out_file = open(sys.stdout.fileno(), 'w', encoding='utf-8', closefd=False)
else:
- args.out_file = args.output_file.open('w')
+ args.out_file = args.output_file.open('w', encoding='utf-8')
return args
def main(arglist: Optional[Sequence[str]]=None) -> int:
args = parse_arguments(arglist)
with urllib.request.urlopen(args.discovery_url) as discovery_file:
- if not (discovery_file.status is None or 200 <= discovery_file.status < 300):
+ status = discovery_file.getcode()
+ if not (status is None or 200 <= status < 300):
print(
f"error getting {args.discovery_url}: server returned {discovery_file.status}",
file=sys.stderr,
)
return os.EX_IOERR
discovery_document = json.load(discovery_file)
- resources = sorted(discovery_document['resources'].items())
+ print(
+ to_docstring(_MODULE_PYDOC, indent=0),
+ _MODULE_PRELUDE,
+ sep='\n', file=args.out_file,
+ )
+ schemas = sorted(discovery_document['schemas'].items())
+ for name, schema_spec in schemas:
+ print(document_schema(name, schema_spec), file=args.out_file)
+
+ resources = sorted(discovery_document['resources'].items())
for name, resource_spec in resources:
print(document_resource(name, resource_spec), file=args.out_file)
print('''class ArvadosAPIClient:''', file=args.out_file)
for name, _ in resources:
+ class_name = classify_name(name)
+ docstring = f"Return an instance of `{class_name}` to call methods via this client"
+ if class_name in _DEPRECATED_RESOURCES:
+ docstring += _DEPRECATED_NOTICE
method_spec = {
- 'description': f"Return an instance of `{classify_name(name)}` to call methods via this client",
+ 'description': docstring,
'parameters': {},
+ 'response': {
+ '$ref': class_name,
+ },
}
print(Method(name, method_spec).doc(), file=args.out_file)
+ args.out_file.close()
return os.EX_OK
if __name__ == '__main__':