14 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
15 from collections import defaultdict
16 from functools import partial
17 from operator import itemgetter
18 from SocketServer import ThreadingMixIn
20 arv = arvados.api('v1')
22 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
23 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
24 def fileSizeFormat(value):
25 exponent = 0 if value == 0 else int(math.log(value, 1024))
26 return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
29 def byteSizeFromValidUuid(valid_uuid):
30 return int(valid_uuid.split('+')[1])
33 """A dictionary that holds the largest value entered for each key."""
34 def addValue(self, key, value):
35 dict.__setitem__(self, key, max(dict.get(self, key), value))
36 def addValues(self, kv_pairs):
37 for key,value in kv_pairs:
38 self.addValue(key, value)
40 self.addValues(d.items())
43 DEFAULT_PERSISTER_REPLICATION_LEVEL=2
46 def __init__(self, uuid):
47 if CollectionInfo.all_by_uuid.has_key(uuid):
48 raise ValueError('Collection for uuid "%s" already exists.' % uuid)
50 self.block_uuids = set() # uuids of keep blocks in this collection
51 self.reader_uuids = set() # uuids of users who can read this collection
52 self.persister_uuids = set() # uuids of users who want this collection saved
53 # map from user uuid to replication level they desire
54 self.persister_replication = maxdict()
56 # The whole api response in case we need anything else later.
57 self.api_response = []
58 CollectionInfo.all_by_uuid[uuid] = self
61 return sum(map(byteSizeFromValidUuid, self.block_uuids))
64 return ('CollectionInfo uuid: %s\n'
65 ' %d block(s) containing %s\n'
67 ' persister_replication: %s' %
69 len(self.block_uuids),
70 fileSizeFormat(self.byteSize()),
71 pprint.pformat(self.reader_uuids, indent = 15),
72 pprint.pformat(self.persister_replication, indent = 15)))
76 if not CollectionInfo.all_by_uuid.has_key(uuid):
78 return CollectionInfo.all_by_uuid[uuid]
81 def extractUuid(candidate):
82 """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
83 match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
84 return match and match.group(1)
86 def checkUserIsAdmin():
87 current_user = arv.users().current().execute()
89 if not current_user['is_admin']:
90 log.warning('Current user %s (%s - %s) does not have '
91 'admin access and will not see much of the data.',
92 current_user['full_name'],
93 current_user['email'],
95 if args.require_admin_user:
96 log.critical('Exiting, rerun with --no-require-admin-user '
97 'if you wish to continue.')
100 def buildCollectionsList():
104 collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
106 print ('Returned %d of %d collections.' %
107 (len(collections_list_response['items']),
108 collections_list_response['items_available']))
110 return [item['uuid'] for item in collections_list_response['items']]
113 def readCollections(collection_uuids):
114 for collection_uuid in collection_uuids:
115 collection_block_uuids = set()
116 collection_response = arv.collections().get(uuid=collection_uuid).execute()
117 collection_info = CollectionInfo.get(collection_uuid)
118 collection_info.api_response = collection_response
119 manifest_lines = collection_response['manifest_text'].split('\n')
122 print 'Manifest text for %s:' % collection_uuid
123 pprint.pprint(manifest_lines)
125 for manifest_line in manifest_lines:
127 manifest_tokens = manifest_line.split(' ')
129 print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
130 stream_name = manifest_tokens[0]
132 line_block_uuids = set(filter(None,
133 [extractUuid(candidate)
134 for candidate in manifest_tokens[1:]]))
135 collection_info.block_uuids.update(line_block_uuids)
137 # file_tokens = [token
138 # for token in manifest_tokens[1:]
139 # if extractUuid(token) is None]
141 # # Sort file tokens by start position in case they aren't already
142 # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
145 # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
146 # print 'file_tokens: ' + pprint.pformat(file_tokens)
152 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
153 collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
154 link_classes.update([link['link_class'] for link in collection_links_response['items']])
155 for link in collection_links_response['items']:
156 if link['link_class'] == 'permission':
157 collection_info.reader_uuids.add(link['tail_uuid'])
158 elif link['link_class'] == 'resources':
159 replication_level = link['properties'].get(
161 CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
162 collection_info.persister_replication.addValue(
165 collection_info.persister_uuids.add(link['tail_uuid'])
167 print 'Found the following link classes:'
168 pprint.pprint(link_classes)
170 def reportMostPopularCollections():
171 most_popular_collections = sorted(
172 CollectionInfo.all_by_uuid.values(),
173 key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
176 print 'Most popular Collections:'
177 for collection_info in most_popular_collections:
178 print collection_info
182 for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
183 # Add the block holding the manifest itself for all calculations
184 block_uuids = collection_info.block_uuids.union([collection_uuid,])
185 for block_uuid in block_uuids:
186 block_to_collections[block_uuid].add(collection_uuid)
187 block_to_readers[block_uuid].update(collection_info.reader_uuids)
188 block_to_persisters[block_uuid].update(collection_info.persister_uuids)
189 block_to_persister_replication[block_uuid].addDict(
190 collection_info.persister_replication)
191 for reader_uuid in collection_info.reader_uuids:
192 reader_to_collections[reader_uuid].add(collection_uuid)
193 reader_to_blocks[reader_uuid].update(block_uuids)
194 for persister_uuid in collection_info.persister_uuids:
195 persister_to_collections[persister_uuid].add(collection_uuid)
196 persister_to_blocks[persister_uuid].update(block_uuids)
199 def itemsByValueLength(original):
200 return sorted(original.items(),
201 key=lambda item:len(item[1]),
205 def reportBusiestUsers():
206 busiest_readers = itemsByValueLength(reader_to_collections)
207 print 'The busiest readers are:'
208 for reader,collections in busiest_readers:
209 print '%s reading %d collections.' % (reader, len(collections))
210 busiest_persisters = itemsByValueLength(persister_to_collections)
211 print 'The busiest persisters are:'
212 for persister,collections in busiest_persisters:
213 print '%s reading %d collections.' % (persister, len(collections))
216 def blockDiskUsage(block_uuid):
217 """Returns the disk usage of a block given its uuid.
219 Will return 0 before reading the contents of the keep servers.
221 return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
223 def blockPersistedUsage(user_uuid, block_uuid):
224 return (byteSizeFromValidUuid(block_uuid) *
225 block_to_persister_replication[block_uuid].get(user_uuid, 0))
227 def reportUserDiskUsage():
228 for user, blocks in reader_to_blocks.items():
229 user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
232 user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
233 lambda block_uuid:(float(blockDiskUsage(block_uuid))/
234 len(block_to_readers[block_uuid])),
236 for user, blocks in persister_to_blocks.items():
237 user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
238 partial(blockPersistedUsage, user),
240 user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
241 lambda block_uuid:(float(blockDiskUsage(block_uuid))/
242 len(block_to_persisters[block_uuid])),
244 print ('user: unweighted readable block size, weighted readable block size, '
245 'unweighted persisted block size, weighted persisted block size:')
246 for user, usage in user_to_usage.items():
247 print ('%s: %s %s %s %s' %
249 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
250 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
251 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
252 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
255 def getKeepServers():
256 response = arv.keep_disks().list().execute()
257 return [[keep_server['service_host'], keep_server['service_port']]
258 for keep_server in response['items']]
261 def getKeepBlocks(keep_servers):
263 for host,port in keep_servers:
264 response = urllib2.urlopen('http://%s:%d/index' % (host, port))
265 blocks.append([line.split(' ')
266 for line in response.read().split('\n')
271 def computeReplication(keep_blocks):
272 for server_blocks in keep_blocks:
273 for block_uuid, _ in server_blocks:
274 block_to_replication[block_uuid] += 1
277 # This is the main flow here
279 parser = argparse.ArgumentParser(description='Report on keep disks.')
280 parser.add_argument('-m',
284 help=('The max results to get at once.'))
285 parser.add_argument('-p',
289 help=('The port number to serve on. 0 means no server.'))
290 parser.add_argument('-v',
292 help='increase output verbosity',
294 parser.add_argument('-u',
296 help='uuid of specific collection to process')
297 parser.add_argument('--require-admin-user',
300 help='Fail if the user is not an admin [default]')
301 parser.add_argument('--no-require-admin-user',
302 dest='require_admin_user',
303 action='store_false',
304 help='Allow users without admin permissions with only a warning.')
305 args = parser.parse_args()
307 log = logging.getLogger('arvados.services.datamanager')
308 stderr_handler = logging.StreamHandler()
309 log.setLevel(logging.INFO)
310 stderr_handler.setFormatter(
311 logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
312 log.addHandler(stderr_handler)
314 # Global Data - don't try this at home
315 collection_uuids = []
317 # These maps all map from uuids to a set of uuids
318 block_to_collections = defaultdict(set) # keep blocks
319 reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
320 persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
321 block_to_readers = defaultdict(set)
322 block_to_persisters = defaultdict(set)
323 block_to_persister_replication = defaultdict(maxdict)
324 reader_to_blocks = defaultdict(set)
325 persister_to_blocks = defaultdict(set)
327 UNWEIGHTED_READ_SIZE_COL = 0
328 WEIGHTED_READ_SIZE_COL = 1
329 UNWEIGHTED_PERSIST_SIZE_COL = 2
330 WEIGHTED_PERSIST_SIZE_COL = 3
332 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
336 block_to_replication = defaultdict(lambda: 0)
338 all_data_loaded = False
343 log.info('Building Collection List')
344 global collection_uuids
345 collection_uuids = filter(None, [extractUuid(candidate)
346 for candidate in buildCollectionsList()])
348 log.info('Reading Collections')
349 readCollections(collection_uuids)
352 pprint.pprint(CollectionInfo.all_by_uuid)
354 log.info('Reading Links')
357 reportMostPopularCollections()
359 log.info('Building Maps')
364 log.info('Getting Keep Servers')
366 keep_servers = getKeepServers()
370 log.info('Getting Blocks from each Keep Server.')
372 keep_blocks = getKeepBlocks(keep_servers)
374 computeReplication(keep_blocks)
376 log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication)))
378 reportUserDiskUsage()
380 global all_data_loaded
381 all_data_loaded = True
383 class DataManagerHandler(BaseHTTPRequestHandler):
385 COLLECTION_PATH = 'collection'
388 def userLink(self, uuid):
389 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
391 'path': DataManagerHandler.USER_PATH})
393 def collectionLink(self, uuid):
394 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
396 'path': DataManagerHandler.COLLECTION_PATH})
398 def blockLink(self, uuid):
399 return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
401 'path': DataManagerHandler.BLOCK_PATH})
403 def writeTop(self, title):
404 self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
406 def writeBottom(self):
407 self.wfile.write('</BODY></HTML>\n')
409 def writeHomePage(self):
410 self.send_response(200)
412 self.writeTop('Home')
413 self.wfile.write('<TABLE>')
414 self.wfile.write('<TR><TH>user'
415 '<TH>unweighted readable block size'
416 '<TH>weighted readable block size'
417 '<TH>unweighted persisted block size'
418 '<TH>weighted persisted block size</TR>\n')
419 for user, usage in user_to_usage.items():
420 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
421 (self.userLink(user),
422 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
423 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
424 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
425 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
426 self.wfile.write('</TABLE>\n')
429 def userExists(self, uuid):
430 # Currently this will return false for a user who exists but
431 # doesn't appear on any manifests.
432 # TODO(misha): Figure out if we need to fix this.
433 return user_to_usage.has_key(uuid)
435 def writeUserPage(self, uuid):
436 if not self.userExists(uuid):
438 'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
440 # Here we assume that since a user exists, they don't need to be
442 self.send_response(200)
444 self.writeTop('User %s' % uuid)
445 self.wfile.write('<TABLE>')
446 self.wfile.write('<TR><TH>user'
447 '<TH>unweighted readable block size'
448 '<TH>weighted readable block size'
449 '<TH>unweighted persisted block size'
450 '<TH>weighted persisted block size</TR>\n')
451 usage = user_to_usage[uuid]
452 self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
453 (self.userLink(uuid),
454 fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
455 fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
456 fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
457 fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
458 self.wfile.write('</TABLE>\n')
459 self.wfile.write('<P>Persisting Collections: %s\n' %
460 ', '.join(map(self.collectionLink,
461 persister_to_collections[uuid])))
462 self.wfile.write('<P>Reading Collections: %s\n' %
463 ', '.join(map(self.collectionLink,
464 reader_to_collections[uuid])))
467 def collectionExists(self, uuid):
468 return CollectionInfo.all_by_uuid.has_key(uuid)
470 def writeCollectionPage(self, uuid):
471 if not self.collectionExists(uuid):
473 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
475 collection = CollectionInfo.get(uuid)
476 # Here we assume that since a collection exists, its id doesn't
477 # need to be html escaped.
478 self.send_response(200)
480 self.writeTop('Collection %s' % uuid)
481 self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
482 self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
483 fileSizeFormat(collection.byteSize()))
484 self.wfile.write('<P>Readers: %s\n' %
485 ', '.join(map(self.userLink, collection.reader_uuids)))
487 if len(collection.persister_replication) == 0:
488 self.wfile.write('<P>No persisters\n')
490 replication_to_users = defaultdict(set)
491 for user,replication in collection.persister_replication.items():
492 replication_to_users[replication].add(user)
493 replication_levels = sorted(replication_to_users.keys())
495 self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
496 'out at %dx replication:\n' %
497 (len(collection.persister_replication),
498 len(replication_levels),
499 replication_levels[-1]))
501 # TODO(misha): This code is used twice, let's move it to a method.
502 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
503 '<TH>'.join(['Replication Level ' + str(x)
504 for x in replication_levels]))
505 self.wfile.write('<TR>\n')
506 for replication_level in replication_levels:
507 users = replication_to_users[replication_level]
508 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
509 map(self.userLink, users)))
510 self.wfile.write('</TR></TABLE>\n')
512 replication_to_blocks = defaultdict(set)
513 for block in collection.block_uuids:
514 replication_to_blocks[block_to_replication[block]].add(block)
515 replication_levels = sorted(replication_to_blocks.keys())
516 self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
517 (len(collection.block_uuids), len(replication_levels)))
518 self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
519 '<TH>'.join(['Replication Level ' + str(x)
520 for x in replication_levels]))
521 self.wfile.write('<TR>\n')
522 for replication_level in replication_levels:
523 blocks = replication_to_blocks[replication_level]
524 self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
525 self.wfile.write('</TR></TABLE>\n')
529 if not all_data_loaded:
531 'Sorry, but I am still loading all the data I need.')
533 # Removing leading '/' and process request path
534 split_path = self.path[1:].split('/')
535 request_type = split_path[0]
536 log.debug('path (%s) split as %s with request_type %s' % (self.path,
539 if request_type == '':
541 elif request_type == DataManagerHandler.USER_PATH:
542 self.writeUserPage(split_path[1])
543 elif request_type == DataManagerHandler.COLLECTION_PATH:
544 self.writeCollectionPage(split_path[1])
546 self.send_error(404, 'Unrecognized request path.')
549 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
550 """Handle requests in a separate thread."""
552 #if __name__ == '__main__':
557 loader = threading.Thread(target = loadAllData, name = 'loader')
560 server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
561 server.serve_forever()