10 from collections import defaultdict
12 from operator import itemgetter
14 arv = arvados.api('v1')
16 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
17 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
18 def fileSizeFormat(value):
19 exponent = 0 if value == 0 else int(log(value, 1024))
20 return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
23 def byteSizeFromValidUuid(valid_uuid):
24 return int(valid_uuid.split('+')[1])
29 def __init__(self, uuid):
30 if CollectionInfo.all_by_uuid.has_key(uuid):
31 raise ValueError('Collection for uuid "%s" already exists.' % uuid)
33 self.block_uuids = set() # uuids of keep blocks in this collection
34 self.reader_uuids = set() # uuids of users who can read this collection
35 self.persister_uuids = set() # uuids of users who want this collection saved
36 CollectionInfo.all_by_uuid[uuid] = self
39 return sum(map(byteSizeFromValidUuid, self.block_uuids))
42 return ('CollectionInfo uuid: %s\n'
43 ' %d block(s) containing %s\n'
45 ' persister_uuids: %s' %
47 len(self.block_uuids),
48 fileSizeFormat(self.byte_size()),
49 pprint.pformat(self.reader_uuids, indent = 15),
50 pprint.pformat(self.persister_uuids, indent = 15)))
54 if not CollectionInfo.all_by_uuid.has_key(uuid):
56 return CollectionInfo.all_by_uuid[uuid]
59 def extractUuid(candidate):
60 """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
61 match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
62 return match and match.group(1)
64 def checkUserIsAdmin():
65 current_user = arv.users().current().execute()
67 if not current_user['is_admin']:
68 # TODO(misha): Use a logging framework here
69 print ('Warning current user %s (%s - %s) does not have admin access '
70 'and will not see much of the data.' %
71 (current_user['full_name'],
72 current_user['email'],
73 current_user['uuid']))
74 if args.require_admin_user:
75 print 'Exiting, rerun with --no-require-admin-user if you wish to continue.'
78 def buildCollectionsList():
82 collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
84 print ('Returned %d of %d collections.' %
85 (len(collections_list_response['items']),
86 collections_list_response['items_available']))
88 return [item['uuid'] for item in collections_list_response['items']]
91 def readCollections(collection_uuids):
92 for collection_uuid in collection_uuids:
93 collection_block_uuids = set()
94 collection_response = arv.collections().get(uuid=collection_uuid).execute()
95 collection_info = CollectionInfo.get(collection_uuid)
96 manifest_lines = collection_response['manifest_text'].split('\n')
99 print 'Manifest text for %s:' % collection_uuid
100 pprint.pprint(manifest_lines)
102 for manifest_line in manifest_lines:
104 manifest_tokens = manifest_line.split(' ')
106 print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
107 stream_name = manifest_tokens[0]
109 line_block_uuids = set(filter(None,
110 [extractUuid(candidate)
111 for candidate in manifest_tokens[1:]]))
112 collection_info.block_uuids.update(line_block_uuids)
114 # file_tokens = [token
115 # for token in manifest_tokens[1:]
116 # if extractUuid(token) is None]
118 # # Sort file tokens by start position in case they aren't already
119 # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
122 # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
123 # print 'file_tokens: ' + pprint.pformat(file_tokens)
129 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
130 collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
131 link_classes.update([link['link_class'] for link in collection_links_response['items']])
132 for link in collection_links_response['items']:
133 if link['link_class'] == 'permission':
134 collection_info.reader_uuids.add(link['tail_uuid'])
135 elif link['link_class'] == 'resources':
136 collection_info.persister_uuids.add(link['tail_uuid'])
138 print 'Found the following link classes:'
139 pprint.pprint(link_classes)
141 def reportMostPopularCollections():
142 most_popular_collections = sorted(
143 CollectionInfo.all_by_uuid.values(),
144 key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
147 print 'Most popular Collections:'
148 for collection_info in most_popular_collections:
149 print collection_info
153 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
154 # Add the block holding the manifest itself for all calculations
155 block_uuids = collection_info.block_uuids.union([collection_uuid,])
156 for block_uuid in block_uuids:
157 block_to_collections[block_uuid].add(collection_uuid)
158 block_to_readers[block_uuid].update(collection_info.reader_uuids)
159 block_to_persisters[block_uuid].update(collection_info.persister_uuids)
160 for reader_uuid in collection_info.reader_uuids:
161 reader_to_collections[reader_uuid].add(collection_uuid)
162 reader_to_blocks[reader_uuid].update(block_uuids)
163 for persister_uuid in collection_info.persister_uuids:
164 persister_to_collections[persister_uuid].add(collection_uuid)
165 persister_to_blocks[persister_uuid].update(block_uuids)
168 def itemsByValueLength(original):
169 return sorted(original.items(),
170 key=lambda item:len(item[1]),
174 def reportBusiestUsers():
175 busiest_readers = itemsByValueLength(reader_to_collections)
176 print 'The busiest readers are:'
177 for reader,collections in busiest_readers:
178 print '%s reading %d collections.' % (reader, len(collections))
179 busiest_persisters = itemsByValueLength(persister_to_collections)
180 print 'The busiest persisters are:'
181 for persister,collections in busiest_persisters:
182 print '%s reading %d collections.' % (persister, len(collections))
185 def reportUserDiskUsage():
186 for user, blocks in reader_to_blocks.items():
187 user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
188 byteSizeFromValidUuid,
190 user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
191 lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
192 len(block_to_readers[block_uuid])),
194 for user, blocks in persister_to_blocks.items():
195 user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
196 byteSizeFromValidUuid,
198 user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
199 lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
200 len(block_to_persisters[block_uuid])),
202 print ('user: unweighted readable block size, weighted readable block size, '
203 'unweighted persisted block size, weighted persisted block size:')
204 for user, usage in user_to_usage.items():
205 print ('%s: %s %s %s %s' %
207 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
208 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
209 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
210 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
213 def getKeepServers():
214 response = arv.keep_disks().list().execute()
215 return [[keep_server['service_host'], keep_server['service_port']]
216 for keep_server in response['items']]
219 def getKeepBlocks(keep_servers):
221 for host,port in keep_servers:
222 response = urllib2.urlopen('http://%s:%d/index' % (host, port))
223 blocks.append([line.split(' ')
224 for line in response.read().split('\n')
229 def computeReplication(keep_blocks):
230 block_to_replication = defaultdict(lambda: 0)
231 for server_blocks in keep_blocks:
232 for block_uuid, _ in server_blocks:
233 block_to_replication[block_uuid] += 1
234 return block_to_replication
237 # This is the main flow here
239 parser = argparse.ArgumentParser(description='Report on keep disks.')
240 parser.add_argument('-m',
244 help=('The max results to get at once.'))
245 parser.add_argument('-v',
247 help='increase output verbosity',
249 parser.add_argument('-u',
251 help='uuid of specific collection to process')
252 parser.add_argument('--require-admin-user',
254 help='Fail if the user is not an admin [default]')
255 parser.add_argument('--no-require-admin-user',
256 dest='require_admin_user',
257 action='store_false',
258 help='Allow users without admin permissions with only a warning.')
259 args = parser.parse_args()
263 print 'Building Collection List'
264 collection_uuids = filter(None, [extractUuid(candidate)
265 for candidate in buildCollectionsList()])
267 print 'Reading Collections'
268 readCollections(collection_uuids)
271 pprint.pprint(CollectionInfo.all_by_uuid)
273 print 'Reading Links'
276 reportMostPopularCollections()
278 # These maps all map from uuids to a set of uuids
279 # The sets all contain collection uuids.
280 block_to_collections = defaultdict(set) # keep blocks
281 reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
282 persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
283 block_to_readers = defaultdict(set)
284 block_to_persisters = defaultdict(set)
285 reader_to_blocks = defaultdict(set)
286 persister_to_blocks = defaultdict(set)
288 print 'Building Maps'
293 UNWEIGHTED_READ_SIZE_COL = 0
294 WEIGHTED_READ_SIZE_COL = 1
295 UNWEIGHTED_PERSIST_SIZE_COL = 2
296 WEIGHTED_PERSIST_SIZE_COL = 3
298 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
300 reportUserDiskUsage()
302 print 'Getting Keep Servers'
303 keep_servers = getKeepServers()
307 print 'Getting Blocks from each Keep Server.'
308 keep_blocks = getKeepBlocks(keep_servers)
310 block_to_replication = computeReplication(keep_blocks)
312 print 'average replication level is %f' % (float(sum(block_to_replication.values())) / len(block_to_replication))