-import httplib
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import print_function
+from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
+from builtins import object
+import bz2
+import fcntl
+import hashlib
+import http.client
import httplib2
+import json
import logging
import os
import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
import string
-import bz2
-import zlib
-import fcntl
+import sys
import time
-import threading
+import types
+import zlib
+
+if sys.version_info >= (3, 0):
+ from collections import UserDict
+else:
+ from UserDict import UserDict
-from .api import api, http_cache
-from collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
-from keep import *
-from stream import *
-from arvfile import StreamFileReader
-from retry import RetryLoop
-import errors
-import util
+from .api import api, api_from_config, http_cache
+from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
+from arvados.keep import *
+from arvados.stream import *
+from .arvfile import StreamFileReader
+from .retry import RetryLoop
+import arvados.errors as errors
+import arvados.util as util
# Set up Arvados logging based on the user's configuration.
# All Arvados code should log under the arvados hierarchy.
+log_format = '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s'
+log_date_format = '%Y-%m-%d %H:%M:%S'
log_handler = logging.StreamHandler()
-log_handler.setFormatter(logging.Formatter(
- '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
- '%Y-%m-%d %H:%M:%S'))
+log_handler.setFormatter(logging.Formatter(log_format, log_date_format))
logger = logging.getLogger('arvados')
logger.addHandler(log_handler)
logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
try:
task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
- task = UserDict.UserDict(task)
+ task = UserDict(task)
task.set_output = types.MethodType(task_set_output, task)
task.tmpdir = os.environ['TASK_WORK']
_current_task = task
for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
try:
job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
- job = UserDict.UserDict(job)
+ job = UserDict(job)
job.tmpdir = os.environ['JOB_WORK']
_current_job = job
return job
class JobTask(object):
def __init__(self, parameters=dict(), runtime_constraints=dict()):
- print "init jobtask %s %s" % (parameters, runtime_constraints)
+ print("init jobtask %s %s" % (parameters, runtime_constraints))
-class job_setup:
+class job_setup(object):
@staticmethod
def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
if if_sequence != current_task()['sequence']: