a016fb81451736d2e2540c4edc601982d6c78980
[arvados.git] / services / datamanager / datamanager.py
1 #! /usr/bin/env python
2
3 import arvados
4
5 import argparse
6 import cgi
7 import logging
8 import math
9 import pprint
10 import re
11 import threading
12 import urllib2
13
14 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
15 from collections import defaultdict
16 from functools import partial
17 from operator import itemgetter
18 from SocketServer import ThreadingMixIn
19
20 arv = arvados.api('v1')
21
22 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
23 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
24 def fileSizeFormat(value):
25   exponent = 0 if value == 0 else int(math.log(value, 1024))
26   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
27                          byteunits[exponent])
28
29 def byteSizeFromValidUuid(valid_uuid):
30   return int(valid_uuid.split('+')[1])
31
32 class maxdict(dict):
33   """A dictionary that holds the largest value entered for each key."""
34   def addValue(self, key, value):
35     dict.__setitem__(self, key, max(dict.get(self, key), value))
36   def addValues(self, kv_pairs):
37     for key,value in kv_pairs:
38       self.addValue(key, value)
39   def addDict(self, d):
40     self.addValues(d.items())
41
42 class CollectionInfo:
43   DEFAULT_PERSISTER_REPLICATION_LEVEL=2
44   all_by_uuid = {}
45
46   def __init__(self, uuid):
47     if CollectionInfo.all_by_uuid.has_key(uuid):
48       raise ValueError('Collection for uuid "%s" already exists.' % uuid)
49     self.uuid = uuid
50     self.block_uuids = set()  # uuids of keep blocks in this collection
51     self.reader_uuids = set()  # uuids of users who can read this collection
52     self.persister_uuids = set()  # uuids of users who want this collection saved
53     # map from user uuid to replication level they desire
54     self.persister_replication = maxdict()
55
56     # The whole api response in case we need anything else later.
57     self.api_response = []
58     CollectionInfo.all_by_uuid[uuid] = self
59
60   def byteSize(self):
61     return sum(map(byteSizeFromValidUuid, self.block_uuids))
62
63   def __str__(self):
64     return ('CollectionInfo uuid: %s\n'
65             '               %d block(s) containing %s\n'
66             '               reader_uuids: %s\n'
67             '               persister_replication: %s' %
68             (self.uuid,
69              len(self.block_uuids),
70              fileSizeFormat(self.byteSize()),
71              pprint.pformat(self.reader_uuids, indent = 15),
72              pprint.pformat(self.persister_replication, indent = 15)))
73
74   @staticmethod
75   def get(uuid):
76     if not CollectionInfo.all_by_uuid.has_key(uuid):
77       CollectionInfo(uuid)
78     return CollectionInfo.all_by_uuid[uuid]
79   
80
81 def extractUuid(candidate):
82   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
83   match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
84   return match and match.group(1)
85
86 def checkUserIsAdmin():
87   current_user = arv.users().current().execute()
88
89   if not current_user['is_admin']:
90     log.warning('Current user %s (%s - %s) does not have '
91                 'admin access and will not see much of the data.',
92                 current_user['full_name'],
93                 current_user['email'],
94                 current_user['uuid'])
95     if args.require_admin_user:
96       log.critical('Exiting, rerun with --no-require-admin-user '
97                    'if you wish to continue.')
98       exit(1)
99
100 def buildCollectionsList():
101   if args.uuid:
102     return [args.uuid,]
103   else:
104     collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
105
106     print ('Returned %d of %d collections.' %
107            (len(collections_list_response['items']),
108             collections_list_response['items_available']))
109
110     return [item['uuid'] for item in collections_list_response['items']]
111
112
113 def readCollections(collection_uuids):
114   for collection_uuid in collection_uuids:
115     collection_block_uuids = set()
116     collection_response = arv.collections().get(uuid=collection_uuid).execute()
117     collection_info = CollectionInfo.get(collection_uuid)
118     collection_info.api_response = collection_response
119     manifest_lines = collection_response['manifest_text'].split('\n')
120
121     if args.verbose:
122       print 'Manifest text for %s:' % collection_uuid
123       pprint.pprint(manifest_lines)
124
125     for manifest_line in manifest_lines:
126       if manifest_line:
127         manifest_tokens = manifest_line.split(' ')
128         if args.verbose:
129           print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
130         stream_name = manifest_tokens[0]
131
132         line_block_uuids = set(filter(None,
133                                       [extractUuid(candidate)
134                                        for candidate in manifest_tokens[1:]]))
135         collection_info.block_uuids.update(line_block_uuids)
136
137         # file_tokens = [token
138         #                for token in manifest_tokens[1:]
139         #                if extractUuid(token) is None]
140
141         # # Sort file tokens by start position in case they aren't already
142         # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
143
144         # if args.verbose:
145         #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
146         #   print 'file_tokens: ' + pprint.pformat(file_tokens)
147
148
149 def readLinks():
150   link_classes = set()
151
152   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
153     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
154     link_classes.update([link['link_class'] for link in collection_links_response['items']])
155     for link in collection_links_response['items']:
156       if link['link_class'] == 'permission':
157         collection_info.reader_uuids.add(link['tail_uuid'])
158       elif link['link_class'] == 'resources':
159         replication_level = link['properties'].get(
160           'replication',
161           CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
162         collection_info.persister_replication.addValue(
163           link['tail_uuid'],
164           replication_level)
165         collection_info.persister_uuids.add(link['tail_uuid'])
166
167   print 'Found the following link classes:'
168   pprint.pprint(link_classes)
169
170 def reportMostPopularCollections():
171   most_popular_collections = sorted(
172     CollectionInfo.all_by_uuid.values(),
173     key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
174     reverse=True)[:10]
175
176   print 'Most popular Collections:'
177   for collection_info in most_popular_collections:
178     print collection_info
179
180
181 def buildMaps():
182   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
183     # Add the block holding the manifest itself for all calculations
184     block_uuids = collection_info.block_uuids.union([collection_uuid,])
185     for block_uuid in block_uuids:
186       block_to_collections[block_uuid].add(collection_uuid)
187       block_to_readers[block_uuid].update(collection_info.reader_uuids)
188       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
189       block_to_persister_replication[block_uuid].addDict(
190         collection_info.persister_replication)
191     for reader_uuid in collection_info.reader_uuids:
192       reader_to_collections[reader_uuid].add(collection_uuid)
193       reader_to_blocks[reader_uuid].update(block_uuids)
194     for persister_uuid in collection_info.persister_uuids:
195       persister_to_collections[persister_uuid].add(collection_uuid)
196       persister_to_blocks[persister_uuid].update(block_uuids)
197
198
199 def itemsByValueLength(original):
200   return sorted(original.items(),
201                 key=lambda item:len(item[1]),
202                 reverse=True)
203
204
205 def reportBusiestUsers():
206   busiest_readers = itemsByValueLength(reader_to_collections)
207   print 'The busiest readers are:'
208   for reader,collections in busiest_readers:
209     print '%s reading %d collections.' % (reader, len(collections))
210   busiest_persisters = itemsByValueLength(persister_to_collections)
211   print 'The busiest persisters are:'
212   for persister,collections in busiest_persisters:
213     print '%s reading %d collections.' % (persister, len(collections))
214
215
216 def blockDiskUsage(block_uuid):
217   """Returns the disk usage of a block given its uuid.
218
219   Will return 0 before reading the contents of the keep servers.
220   """
221   return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
222
223 def blockPersistedUsage(user_uuid, block_uuid):
224   return (byteSizeFromValidUuid(block_uuid) *
225           block_to_persister_replication[block_uuid].get(user_uuid, 0))
226
227 def reportUserDiskUsage():
228   for user, blocks in reader_to_blocks.items():
229     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
230         blockDiskUsage,
231         blocks))
232     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
233         lambda block_uuid:(float(blockDiskUsage(block_uuid))/
234                                  len(block_to_readers[block_uuid])),
235         blocks))
236   for user, blocks in persister_to_blocks.items():
237     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
238         partial(blockPersistedUsage, user),
239         blocks))
240     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
241         lambda block_uuid:(float(blockDiskUsage(block_uuid))/
242                                  len(block_to_persisters[block_uuid])),
243         blocks))
244   print ('user: unweighted readable block size, weighted readable block size, '
245          'unweighted persisted block size, weighted persisted block size:')
246   for user, usage in user_to_usage.items():
247     print ('%s: %s %s %s %s' %
248            (user,
249             fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
250             fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
251             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
252             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
253
254
255 def getKeepServers():
256   response = arv.keep_disks().list().execute()
257   return [[keep_server['service_host'], keep_server['service_port']]
258           for keep_server in response['items']]
259
260
261 def getKeepBlocks(keep_servers):
262   blocks = []
263   for host,port in keep_servers:
264     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
265     blocks.append([line.split(' ')
266                    for line in response.read().split('\n')
267                    if line])
268   return blocks
269
270
271 def computeReplication(keep_blocks):
272   for server_blocks in keep_blocks:
273     for block_uuid, _ in server_blocks:
274       block_to_replication[block_uuid] += 1
275
276
277 # This is the main flow here
278
279 parser = argparse.ArgumentParser(description='Report on keep disks.')
280 parser.add_argument('-m',
281                     '--max-api-results',
282                     type=int,
283                     default=5000,
284                     help=('The max results to get at once.'))
285 parser.add_argument('-p',
286                     '--port',
287                     type=int,
288                     default=9090,
289                     help=('The port number to serve on. 0 means no server.'))
290 parser.add_argument('-v',
291                     '--verbose',
292                     help='increase output verbosity',
293                     action='store_true')
294 parser.add_argument('-u',
295                     '--uuid',
296                     help='uuid of specific collection to process')
297 parser.add_argument('--require-admin-user',
298                     action='store_true',
299                     default=True,
300                     help='Fail if the user is not an admin [default]')
301 parser.add_argument('--no-require-admin-user',
302                     dest='require_admin_user',
303                     action='store_false',
304                     help='Allow users without admin permissions with only a warning.')
305 args = parser.parse_args()
306
307 log = logging.getLogger('arvados.services.datamanager')
308 stderr_handler = logging.StreamHandler()
309 log.setLevel(logging.INFO)
310 stderr_handler.setFormatter(
311   logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
312 log.addHandler(stderr_handler)
313
314 # Global Data - don't try this at home
315 collection_uuids = []
316
317 # These maps all map from uuids to a set of uuids
318 block_to_collections = defaultdict(set)  # keep blocks
319 reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
320 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
321 block_to_readers = defaultdict(set)
322 block_to_persisters = defaultdict(set)
323 block_to_persister_replication = defaultdict(maxdict)
324 reader_to_blocks = defaultdict(set)
325 persister_to_blocks = defaultdict(set)
326
327 UNWEIGHTED_READ_SIZE_COL = 0
328 WEIGHTED_READ_SIZE_COL = 1
329 UNWEIGHTED_PERSIST_SIZE_COL = 2
330 WEIGHTED_PERSIST_SIZE_COL = 3
331 NUM_COLS = 4
332 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
333
334 keep_servers = []
335 keep_blocks = []
336 block_to_replication = defaultdict(lambda: 0)
337
338 all_data_loaded = False
339
340 def loadAllData():
341   checkUserIsAdmin()
342
343   log.info('Building Collection List')
344   global collection_uuids
345   collection_uuids = filter(None, [extractUuid(candidate)
346                                    for candidate in buildCollectionsList()])
347
348   log.info('Reading Collections')
349   readCollections(collection_uuids)
350
351   if args.verbose:
352     pprint.pprint(CollectionInfo.all_by_uuid)
353
354   log.info('Reading Links')
355   readLinks()
356
357   reportMostPopularCollections()
358
359   log.info('Building Maps')
360   buildMaps()
361
362   reportBusiestUsers()
363
364   log.info('Getting Keep Servers')
365   global keep_servers
366   keep_servers = getKeepServers()
367
368   print keep_servers
369
370   log.info('Getting Blocks from each Keep Server.')
371   global keep_blocks
372   keep_blocks = getKeepBlocks(keep_servers)
373
374   computeReplication(keep_blocks)
375
376   log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication)))
377
378   reportUserDiskUsage()
379
380   global all_data_loaded
381   all_data_loaded = True
382
383 class DataManagerHandler(BaseHTTPRequestHandler):
384   USER_PATH = 'user'
385   COLLECTION_PATH = 'collection'
386   BLOCK_PATH = 'block'
387
388   def userLink(self, uuid):
389     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
390             {'uuid': uuid,
391              'path': DataManagerHandler.USER_PATH})
392
393   def collectionLink(self, uuid):
394     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
395             {'uuid': uuid,
396              'path': DataManagerHandler.COLLECTION_PATH})
397
398   def blockLink(self, uuid):
399     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
400             {'uuid': uuid,
401              'path': DataManagerHandler.BLOCK_PATH})
402
403   def writeTop(self, title):
404     self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
405     
406   def writeBottom(self):
407     self.wfile.write('</BODY></HTML>\n')
408     
409   def writeHomePage(self):
410     self.send_response(200)
411     self.end_headers()
412     self.writeTop('Home')
413     self.wfile.write('<TABLE>')
414     self.wfile.write('<TR><TH>user'
415                      '<TH>unweighted readable block size'
416                      '<TH>weighted readable block size'
417                      '<TH>unweighted persisted block size'
418                      '<TH>weighted persisted block size</TR>\n')
419     for user, usage in user_to_usage.items():
420       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
421                        (self.userLink(user),
422                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
423                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
424                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
425                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
426     self.wfile.write('</TABLE>\n')
427     self.writeBottom()
428
429   def userExists(self, uuid):
430     # Currently this will return false for a user who exists but
431     # doesn't appear on any manifests.
432     # TODO(misha): Figure out if we need to fix this.
433     return user_to_usage.has_key(uuid)
434
435   def writeUserPage(self, uuid):
436     if not self.userExists(uuid):
437       self.send_error(404,
438                       'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
439     else:
440       # Here we assume that since a user exists, they don't need to be
441       # html escaped.
442       self.send_response(200)
443       self.end_headers()
444       self.writeTop('User %s' % uuid)
445       self.wfile.write('<TABLE>')
446       self.wfile.write('<TR><TH>user'
447                        '<TH>unweighted readable block size'
448                        '<TH>weighted readable block size'
449                        '<TH>unweighted persisted block size'
450                        '<TH>weighted persisted block size</TR>\n')
451       usage = user_to_usage[uuid]
452       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
453                        (self.userLink(uuid),
454                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
455                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
456                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
457                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
458       self.wfile.write('</TABLE>\n')
459       self.wfile.write('<P>Persisting Collections: %s\n' %
460                        ', '.join(map(self.collectionLink,
461                                      persister_to_collections[uuid])))
462       self.wfile.write('<P>Reading Collections: %s\n' %
463                        ', '.join(map(self.collectionLink,
464                                      reader_to_collections[uuid])))
465       self.writeBottom()
466
467   def collectionExists(self, uuid):
468     return CollectionInfo.all_by_uuid.has_key(uuid)
469
470   def writeCollectionPage(self, uuid):
471     if not self.collectionExists(uuid):
472       self.send_error(404,
473                       'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
474     else:
475       collection = CollectionInfo.get(uuid)
476       # Here we assume that since a collection exists, its id doesn't
477       # need to be html escaped.
478       self.send_response(200)
479       self.end_headers()
480       self.writeTop('Collection %s' % uuid)
481       self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
482       self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
483                        fileSizeFormat(collection.byteSize()))
484       self.wfile.write('<P>Readers: %s\n' %
485                        ', '.join(map(self.userLink, collection.reader_uuids)))
486
487       if len(collection.persister_replication) == 0:
488         self.wfile.write('<P>No persisters\n')
489       else:
490         replication_to_users = defaultdict(set)
491         for user,replication in collection.persister_replication.items():
492           replication_to_users[replication].add(user)
493         replication_levels = sorted(replication_to_users.keys())
494
495         self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
496                          'out at %dx replication:\n' %
497                          (len(collection.persister_replication),
498                           len(replication_levels),
499                           replication_levels[-1]))
500
501         # TODO(misha): This code is used twice, let's move it to a method.
502         self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
503                          '<TH>'.join(['Replication Level ' + str(x)
504                                       for x in replication_levels]))
505         self.wfile.write('<TR>\n')
506         for replication_level in replication_levels:
507           users = replication_to_users[replication_level]
508           self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
509               map(self.userLink, users)))
510         self.wfile.write('</TR></TABLE>\n')
511
512       replication_to_blocks = defaultdict(set)
513       for block in collection.block_uuids:
514         replication_to_blocks[block_to_replication[block]].add(block)
515       replication_levels = sorted(replication_to_blocks.keys())
516       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
517                        (len(collection.block_uuids), len(replication_levels)))
518       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
519                        '<TH>'.join(['Replication Level ' + str(x)
520                                     for x in replication_levels]))
521       self.wfile.write('<TR>\n')
522       for replication_level in replication_levels:
523         blocks = replication_to_blocks[replication_level]
524         self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
525       self.wfile.write('</TR></TABLE>\n')
526       
527
528   def do_GET(self):
529     if not all_data_loaded:
530       self.send_error(503,
531                       'Sorry, but I am still loading all the data I need.')
532     else:
533       # Removing leading '/' and process request path
534       split_path = self.path[1:].split('/')
535       request_type = split_path[0]
536       log.debug('path (%s) split as %s with request_type %s' % (self.path,
537                                                                 split_path,
538                                                                 request_type))
539       if request_type == '':
540         self.writeHomePage()
541       elif request_type == DataManagerHandler.USER_PATH:
542         self.writeUserPage(split_path[1])
543       elif request_type == DataManagerHandler.COLLECTION_PATH:
544         self.writeCollectionPage(split_path[1])
545       else:
546         self.send_error(404, 'Unrecognized request path.')
547     return
548
549 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
550   """Handle requests in a separate thread."""
551
552 #if __name__ == '__main__':
553
554 if args.port == 0:
555   loadAllData()
556 else:
557   loader = threading.Thread(target = loadAllData, name = 'loader')
558   loader.start()
559
560   server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
561   server.serve_forever()