Removed hack which was compensating for keep server bug which has been fixed.
[arvados.git] / services / datamanager / datamanager.py
1 #! /usr/bin/env python
2
3 import arvados
4
5 import argparse
6 import cgi
7 import logging
8 import math
9 import pprint
10 import re
11 import threading
12 import urllib2
13
14 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
15 from collections import defaultdict, Counter
16 from functools import partial
17 from operator import itemgetter
18 from SocketServer import ThreadingMixIn
19
20 arv = arvados.api('v1')
21
22 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
23 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
24 def fileSizeFormat(value):
25   exponent = 0 if value == 0 else int(math.log(value, 1024))
26   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
27                          byteunits[exponent])
28
29 def byteSizeFromValidUuid(valid_uuid):
30   return int(valid_uuid.split('+')[1])
31
32 class maxdict(dict):
33   """A dictionary that holds the largest value entered for each key."""
34   def addValue(self, key, value):
35     dict.__setitem__(self, key, max(dict.get(self, key), value))
36   def addValues(self, kv_pairs):
37     for key,value in kv_pairs:
38       self.addValue(key, value)
39   def addDict(self, d):
40     self.addValues(d.items())
41
42 class CollectionInfo:
43   DEFAULT_PERSISTER_REPLICATION_LEVEL=2
44   all_by_uuid = {}
45
46   def __init__(self, uuid):
47     if CollectionInfo.all_by_uuid.has_key(uuid):
48       raise ValueError('Collection for uuid "%s" already exists.' % uuid)
49     self.uuid = uuid
50     self.block_uuids = set()  # uuids of keep blocks in this collection
51     self.reader_uuids = set()  # uuids of users who can read this collection
52     self.persister_uuids = set()  # uuids of users who want this collection saved
53     # map from user uuid to replication level they desire
54     self.persister_replication = maxdict()
55
56     # The whole api response in case we need anything else later.
57     self.api_response = []
58     CollectionInfo.all_by_uuid[uuid] = self
59
60   def byteSize(self):
61     return sum(map(byteSizeFromValidUuid, self.block_uuids))
62
63   def __str__(self):
64     return ('CollectionInfo uuid: %s\n'
65             '               %d block(s) containing %s\n'
66             '               reader_uuids: %s\n'
67             '               persister_replication: %s' %
68             (self.uuid,
69              len(self.block_uuids),
70              fileSizeFormat(self.byteSize()),
71              pprint.pformat(self.reader_uuids, indent = 15),
72              pprint.pformat(self.persister_replication, indent = 15)))
73
74   @staticmethod
75   def get(uuid):
76     if not CollectionInfo.all_by_uuid.has_key(uuid):
77       CollectionInfo(uuid)
78     return CollectionInfo.all_by_uuid[uuid]
79   
80
81 def extractUuid(candidate):
82   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
83   match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
84   return match and match.group(1)
85
86 def checkUserIsAdmin():
87   current_user = arv.users().current().execute()
88
89   if not current_user['is_admin']:
90     log.warning('Current user %s (%s - %s) does not have '
91                 'admin access and will not see much of the data.',
92                 current_user['full_name'],
93                 current_user['email'],
94                 current_user['uuid'])
95     if args.require_admin_user:
96       log.critical('Exiting, rerun with --no-require-admin-user '
97                    'if you wish to continue.')
98       exit(1)
99
100 def buildCollectionsList():
101   if args.uuid:
102     return [args.uuid,]
103   else:
104     collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
105
106     print ('Returned %d of %d collections.' %
107            (len(collections_list_response['items']),
108             collections_list_response['items_available']))
109
110     return [item['uuid'] for item in collections_list_response['items']]
111
112
113 def readCollections(collection_uuids):
114   for collection_uuid in collection_uuids:
115     collection_block_uuids = set()
116     collection_response = arv.collections().get(uuid=collection_uuid).execute()
117     collection_info = CollectionInfo.get(collection_uuid)
118     collection_info.api_response = collection_response
119     manifest_lines = collection_response['manifest_text'].split('\n')
120
121     if args.verbose:
122       print 'Manifest text for %s:' % collection_uuid
123       pprint.pprint(manifest_lines)
124
125     for manifest_line in manifest_lines:
126       if manifest_line:
127         manifest_tokens = manifest_line.split(' ')
128         if args.verbose:
129           print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
130         stream_name = manifest_tokens[0]
131
132         line_block_uuids = set(filter(None,
133                                       [extractUuid(candidate)
134                                        for candidate in manifest_tokens[1:]]))
135         collection_info.block_uuids.update(line_block_uuids)
136
137         # file_tokens = [token
138         #                for token in manifest_tokens[1:]
139         #                if extractUuid(token) is None]
140
141         # # Sort file tokens by start position in case they aren't already
142         # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
143
144         # if args.verbose:
145         #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
146         #   print 'file_tokens: ' + pprint.pformat(file_tokens)
147
148
149 def readLinks():
150   link_classes = set()
151
152   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
153     # TODO(misha): We may not be seing all the links, but since items
154     # available does not return an accurate number, I don't knos how
155     # to confirm that we saw all of them.
156     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
157     link_classes.update([link['link_class'] for link in collection_links_response['items']])
158     for link in collection_links_response['items']:
159       if link['link_class'] == 'permission':
160         collection_info.reader_uuids.add(link['tail_uuid'])
161       elif link['link_class'] == 'resources':
162         replication_level = link['properties'].get(
163           'replication',
164           CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
165         collection_info.persister_replication.addValue(
166           link['tail_uuid'],
167           replication_level)
168         collection_info.persister_uuids.add(link['tail_uuid'])
169
170   print 'Found the following link classes:'
171   pprint.pprint(link_classes)
172
173 def reportMostPopularCollections():
174   most_popular_collections = sorted(
175     CollectionInfo.all_by_uuid.values(),
176     key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
177     reverse=True)[:10]
178
179   print 'Most popular Collections:'
180   for collection_info in most_popular_collections:
181     print collection_info
182
183
184 def buildMaps():
185   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
186     # Add the block holding the manifest itself for all calculations
187     block_uuids = collection_info.block_uuids.union([collection_uuid,])
188     for block_uuid in block_uuids:
189       block_to_collections[block_uuid].add(collection_uuid)
190       block_to_readers[block_uuid].update(collection_info.reader_uuids)
191       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
192       block_to_persister_replication[block_uuid].addDict(
193         collection_info.persister_replication)
194     for reader_uuid in collection_info.reader_uuids:
195       reader_to_collections[reader_uuid].add(collection_uuid)
196       reader_to_blocks[reader_uuid].update(block_uuids)
197     for persister_uuid in collection_info.persister_uuids:
198       persister_to_collections[persister_uuid].add(collection_uuid)
199       persister_to_blocks[persister_uuid].update(block_uuids)
200
201
202 def itemsByValueLength(original):
203   return sorted(original.items(),
204                 key=lambda item:len(item[1]),
205                 reverse=True)
206
207
208 def reportBusiestUsers():
209   busiest_readers = itemsByValueLength(reader_to_collections)
210   print 'The busiest readers are:'
211   for reader,collections in busiest_readers:
212     print '%s reading %d collections.' % (reader, len(collections))
213   busiest_persisters = itemsByValueLength(persister_to_collections)
214   print 'The busiest persisters are:'
215   for persister,collections in busiest_persisters:
216     print '%s reading %d collections.' % (persister, len(collections))
217
218
219 def blockDiskUsage(block_uuid):
220   """Returns the disk usage of a block given its uuid.
221
222   Will return 0 before reading the contents of the keep servers.
223   """
224   return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
225
226 def blockPersistedUsage(user_uuid, block_uuid):
227   return (byteSizeFromValidUuid(block_uuid) *
228           block_to_persister_replication[block_uuid].get(user_uuid, 0))
229
230 memo_computeWeightedReplicationCosts = {}
231 def computeWeightedReplicationCosts(replication_levels):
232   """Computes the relative cost of varied replication levels.
233
234   replication_levels: a tuple of integers representing the desired
235   replication level. If n users want a replication level of x then x
236   should appear n times in replication_levels.
237
238   Returns a dictionary from replication level to cost.
239
240   The basic thinking is that the cost of replicating at level x should
241   be shared by everyone who wants replication of level x or higher.
242
243   For example, if we have two users who want 1 copy, one user who
244   wants 3 copies and two users who want 6 copies:
245   the input would be [1, 1, 3, 6, 6] (or any permutation)
246
247   The cost of the first copy is shared by all 5 users, so they each
248   pay 1 copy / 5 users = 0.2.
249   The cost of the second and third copies shared by 3 users, so they
250   each pay 2 copies / 3 users = 0.67 (plus the above costs)
251   The cost of the fourth, fifth and sixth copies is shared by two
252   users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
253
254   Here are some other examples:
255   computeWeightedReplicationCosts([1,]) -> {1:1.0}
256   computeWeightedReplicationCosts([2,]) -> {2:2.0}
257   computeWeightedReplicationCosts([1,1]) -> {1:0.5}
258   computeWeightedReplicationCosts([2,2]) -> {1:1.0}
259   computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
260   computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
261   computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
262   """
263   replication_level_counts = sorted(Counter(replication_levels).items())
264
265   memo_key = str(replication_level_counts)
266
267   if not memo_key in memo_computeWeightedReplicationCosts:
268     last_level = 0
269     current_cost = 0
270     total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
271     cost_for_level = {}
272     for replication_level, count in replication_level_counts:
273       copies_added = replication_level - last_level
274       # compute marginal cost from last level and add it to the last cost
275       current_cost += copies_added / total_interested
276       cost_for_level[replication_level] = current_cost
277       # update invariants
278       last_level = replication_level
279       total_interested -= count
280     memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
281
282   return memo_computeWeightedReplicationCosts[memo_key]
283
284 def blockPersistedWeightedUsage(user_uuid, block_uuid):
285   persister_replication_for_block = block_to_persister_replication[block_uuid]
286   user_replication = persister_replication_for_block[user_uuid]
287   return (
288     byteSizeFromValidUuid(block_uuid) *
289     computeWeightedReplicationCosts(
290       persister_replication_for_block.values())[user_replication])
291
292
293 def computeUserStorageUsage():
294   for user, blocks in reader_to_blocks.items():
295     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
296         byteSizeFromValidUuid,
297         blocks))
298     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
299         lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
300                                  len(block_to_readers[block_uuid])),
301         blocks))
302   for user, blocks in persister_to_blocks.items():
303     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
304         partial(blockPersistedUsage, user),
305         blocks))
306     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
307         partial(blockPersistedWeightedUsage, user),
308         blocks))
309
310 def printUserStorageUsage():
311   print ('user: unweighted readable block size, weighted readable block size, '
312          'unweighted persisted block size, weighted persisted block size:')
313   for user, usage in user_to_usage.items():
314     print ('%s: %s %s %s %s' %
315            (user,
316             fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
317             fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
318             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
319             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
320
321 def logUserStorageUsage():
322   for user, usage in user_to_usage.items():
323     body = {}
324     # user could actually represent a user or a group. We don't set
325     # the object_type field since we don't know which we have.
326     body['object_uuid'] = user
327     body['event_type'] = args.user_storage_log_event_type
328     info = {}
329     info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
330     info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
331     info['persisted_collections_total_bytes'] = (
332       usage[UNWEIGHTED_PERSIST_SIZE_COL])
333     info['persisted_collections_weighted_bytes'] = (
334       usage[WEIGHTED_PERSIST_SIZE_COL])
335     body['properties'] = info
336     # TODO(misha): Confirm that this will throw an exception if it
337     # fails to create the log entry.
338     arv.logs().create(body=body).execute()
339
340 def getKeepServers():
341   response = arv.keep_disks().list().execute()
342   return [[keep_server['service_host'], keep_server['service_port']]
343           for keep_server in response['items']]
344
345
346 def getKeepBlocks(keep_servers):
347   blocks = []
348   for host,port in keep_servers:
349     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
350     blocks.append([line.split(' ')
351                    for line in response.read().split('\n')
352                    if line])
353   return blocks
354
355
356 def computeReplication(keep_blocks):
357   for server_blocks in keep_blocks:
358     for block_uuid, _ in server_blocks:
359       block_to_replication[block_uuid] += 1
360   log.debug('Seeing the following replication levels among blocks: %s',
361             str(set(block_to_replication.values())))
362
363 def detectReplicationProblems():
364   blocks_not_in_any_collections.update(
365     set(block_to_replication.keys()).difference(block_to_collections.keys()))
366   underreplicated_persisted_blocks.update(
367     [uuid
368      for uuid, persister_replication in block_to_persister_replication.items()
369      if len(persister_replication) > 0 and
370      block_to_replication[uuid] < max(persister_replication.values())])
371   overreplicated_persisted_blocks.update(
372     [uuid
373      for uuid, persister_replication in block_to_persister_replication.items()
374      if len(persister_replication) > 0 and
375      block_to_replication[uuid] > max(persister_replication.values())])
376
377   log.info('Found %d blocks not in any collections, e.g. %s...',
378            len(blocks_not_in_any_collections),
379            ','.join(list(blocks_not_in_any_collections)[:5]))
380   log.info('Found %d underreplicated blocks, e.g. %s...',
381            len(underreplicated_persisted_blocks),
382            ','.join(list(underreplicated_persisted_blocks)[:5]))
383   log.info('Found %d overreplicated blocks, e.g. %s...',
384            len(overreplicated_persisted_blocks),
385            ','.join(list(overreplicated_persisted_blocks)[:5]))
386
387   # TODO:
388   #  Read blocks sorted by mtime
389   #  Cache window vs % free space
390   #  Collections which candidates will appear in
391   #  Youngest underreplicated read blocks that appear in collections.
392   #  Report Collections that have blocks which are missing from (or
393   #   underreplicated in) keep.
394
395
396 # This is the main flow here
397
398 parser = argparse.ArgumentParser(description='Report on keep disks.')
399 """The command line argument parser we use.
400
401 We only use it in the __main__ block, but leave it outside the block
402 in case another package wants to use it or customize it by specifying
403 it as a parent to their commandline parser.
404 """
405 parser.add_argument('-m',
406                     '--max-api-results',
407                     type=int,
408                     default=5000,
409                     help=('The max results to get at once.'))
410 parser.add_argument('-p',
411                     '--port',
412                     type=int,
413                     default=9090,
414                     help=('The port number to serve on. 0 means no server.'))
415 parser.add_argument('-v',
416                     '--verbose',
417                     help='increase output verbosity',
418                     action='store_true')
419 parser.add_argument('-u',
420                     '--uuid',
421                     help='uuid of specific collection to process')
422 parser.add_argument('--require-admin-user',
423                     action='store_true',
424                     default=True,
425                     help='Fail if the user is not an admin [default]')
426 parser.add_argument('--no-require-admin-user',
427                     dest='require_admin_user',
428                     action='store_false',
429                     help=('Allow users without admin permissions with '
430                           'only a warning.'))
431 parser.add_argument('--log-to-workbench',
432                     action='store_true',
433                     default=False,
434                     help='Log findings to workbench')
435 parser.add_argument('--no-log-to-workbench',
436                     dest='log_to_workbench',
437                     action='store_false',
438                     help='Don\'t log findings to workbench [default]')
439 parser.add_argument('--user-storage-log-event-type',
440                     default='user-storage-report',
441                     help=('The event type to set when logging user '
442                           'storage usage to workbench.'))
443
444 args = None
445
446 # TODO(misha): Think about moving some of this to the __main__ block.
447 log = logging.getLogger('arvados.services.datamanager')
448 stderr_handler = logging.StreamHandler()
449 log.setLevel(logging.INFO)
450 stderr_handler.setFormatter(
451   logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
452 log.addHandler(stderr_handler)
453
454 # Global Data - don't try this at home
455 collection_uuids = []
456
457 # These maps all map from uuids to a set of uuids
458 block_to_collections = defaultdict(set)  # keep blocks
459 reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
460 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
461 block_to_readers = defaultdict(set)
462 block_to_persisters = defaultdict(set)
463 block_to_persister_replication = defaultdict(maxdict)
464 reader_to_blocks = defaultdict(set)
465 persister_to_blocks = defaultdict(set)
466
467 UNWEIGHTED_READ_SIZE_COL = 0
468 WEIGHTED_READ_SIZE_COL = 1
469 UNWEIGHTED_PERSIST_SIZE_COL = 2
470 WEIGHTED_PERSIST_SIZE_COL = 3
471 NUM_COLS = 4
472 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
473
474 keep_servers = []
475 keep_blocks = []
476 block_to_replication = defaultdict(lambda: 0)
477
478 # Stuff to report on
479 blocks_not_in_any_collections = set()
480 underreplicated_persisted_blocks = set()
481 overreplicated_persisted_blocks = set()
482
483 all_data_loaded = False
484
485 def loadAllData():
486   checkUserIsAdmin()
487
488   log.info('Building Collection List')
489   global collection_uuids
490   collection_uuids = filter(None, [extractUuid(candidate)
491                                    for candidate in buildCollectionsList()])
492
493   log.info('Reading Collections')
494   readCollections(collection_uuids)
495
496   if args.verbose:
497     pprint.pprint(CollectionInfo.all_by_uuid)
498
499   log.info('Reading Links')
500   readLinks()
501
502   reportMostPopularCollections()
503
504   log.info('Building Maps')
505   buildMaps()
506
507   reportBusiestUsers()
508
509   log.info('Getting Keep Servers')
510   global keep_servers
511   keep_servers = getKeepServers()
512
513   print keep_servers
514
515   log.info('Getting Blocks from each Keep Server.')
516   global keep_blocks
517   keep_blocks = getKeepBlocks(keep_servers)
518
519   computeReplication(keep_blocks)
520
521   log.info('average replication level is %f',
522            (float(sum(block_to_replication.values())) /
523             len(block_to_replication)))
524
525   detectReplicationProblems()
526
527   computeUserStorageUsage()
528   printUserStorageUsage()
529   if args.log_to_workbench:
530     logUserStorageUsage()
531
532   global all_data_loaded
533   all_data_loaded = True
534
535
536 class DataManagerHandler(BaseHTTPRequestHandler):
537   USER_PATH = 'user'
538   COLLECTION_PATH = 'collection'
539   BLOCK_PATH = 'block'
540
541   def userLink(self, uuid):
542     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
543             {'uuid': uuid,
544              'path': DataManagerHandler.USER_PATH})
545
546   def collectionLink(self, uuid):
547     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
548             {'uuid': uuid,
549              'path': DataManagerHandler.COLLECTION_PATH})
550
551   def blockLink(self, uuid):
552     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
553             {'uuid': uuid,
554              'path': DataManagerHandler.BLOCK_PATH})
555
556   def writeTop(self, title):
557     self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
558     
559   def writeBottom(self):
560     self.wfile.write('</BODY></HTML>\n')
561     
562   def writeHomePage(self):
563     self.send_response(200)
564     self.end_headers()
565     self.writeTop('Home')
566     self.wfile.write('<TABLE>')
567     self.wfile.write('<TR><TH>user'
568                      '<TH>unweighted readable block size'
569                      '<TH>weighted readable block size'
570                      '<TH>unweighted persisted block size'
571                      '<TH>weighted persisted block size</TR>\n')
572     for user, usage in user_to_usage.items():
573       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
574                        (self.userLink(user),
575                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
576                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
577                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
578                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
579     self.wfile.write('</TABLE>\n')
580     self.writeBottom()
581
582   def userExists(self, uuid):
583     # Currently this will return false for a user who exists but
584     # doesn't appear on any manifests.
585     # TODO(misha): Figure out if we need to fix this.
586     return user_to_usage.has_key(uuid)
587
588   def writeUserPage(self, uuid):
589     if not self.userExists(uuid):
590       self.send_error(404,
591                       'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
592     else:
593       # Here we assume that since a user exists, they don't need to be
594       # html escaped.
595       self.send_response(200)
596       self.end_headers()
597       self.writeTop('User %s' % uuid)
598       self.wfile.write('<TABLE>')
599       self.wfile.write('<TR><TH>user'
600                        '<TH>unweighted readable block size'
601                        '<TH>weighted readable block size'
602                        '<TH>unweighted persisted block size'
603                        '<TH>weighted persisted block size</TR>\n')
604       usage = user_to_usage[uuid]
605       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
606                        (self.userLink(uuid),
607                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
608                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
609                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
610                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
611       self.wfile.write('</TABLE>\n')
612       self.wfile.write('<P>Persisting Collections: %s\n' %
613                        ', '.join(map(self.collectionLink,
614                                      persister_to_collections[uuid])))
615       self.wfile.write('<P>Reading Collections: %s\n' %
616                        ', '.join(map(self.collectionLink,
617                                      reader_to_collections[uuid])))
618       self.writeBottom()
619
620   def collectionExists(self, uuid):
621     return CollectionInfo.all_by_uuid.has_key(uuid)
622
623   def writeCollectionPage(self, uuid):
624     if not self.collectionExists(uuid):
625       self.send_error(404,
626                       'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
627     else:
628       collection = CollectionInfo.get(uuid)
629       # Here we assume that since a collection exists, its id doesn't
630       # need to be html escaped.
631       self.send_response(200)
632       self.end_headers()
633       self.writeTop('Collection %s' % uuid)
634       self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
635       self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
636                        fileSizeFormat(collection.byteSize()))
637       self.wfile.write('<P>Readers: %s\n' %
638                        ', '.join(map(self.userLink, collection.reader_uuids)))
639
640       if len(collection.persister_replication) == 0:
641         self.wfile.write('<P>No persisters\n')
642       else:
643         replication_to_users = defaultdict(set)
644         for user,replication in collection.persister_replication.items():
645           replication_to_users[replication].add(user)
646         replication_levels = sorted(replication_to_users.keys())
647
648         self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
649                          'out at %dx replication:\n' %
650                          (len(collection.persister_replication),
651                           len(replication_levels),
652                           replication_levels[-1]))
653
654         # TODO(misha): This code is used twice, let's move it to a method.
655         self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
656                          '<TH>'.join(['Replication Level ' + str(x)
657                                       for x in replication_levels]))
658         self.wfile.write('<TR>\n')
659         for replication_level in replication_levels:
660           users = replication_to_users[replication_level]
661           self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
662               map(self.userLink, users)))
663         self.wfile.write('</TR></TABLE>\n')
664
665       replication_to_blocks = defaultdict(set)
666       for block in collection.block_uuids:
667         replication_to_blocks[block_to_replication[block]].add(block)
668       replication_levels = sorted(replication_to_blocks.keys())
669       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
670                        (len(collection.block_uuids), len(replication_levels)))
671       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
672                        '<TH>'.join(['Replication Level ' + str(x)
673                                     for x in replication_levels]))
674       self.wfile.write('<TR>\n')
675       for replication_level in replication_levels:
676         blocks = replication_to_blocks[replication_level]
677         self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
678       self.wfile.write('</TR></TABLE>\n')
679       
680
681   def do_GET(self):
682     if not all_data_loaded:
683       self.send_error(503,
684                       'Sorry, but I am still loading all the data I need.')
685     else:
686       # Removing leading '/' and process request path
687       split_path = self.path[1:].split('/')
688       request_type = split_path[0]
689       log.debug('path (%s) split as %s with request_type %s' % (self.path,
690                                                                 split_path,
691                                                                 request_type))
692       if request_type == '':
693         self.writeHomePage()
694       elif request_type == DataManagerHandler.USER_PATH:
695         self.writeUserPage(split_path[1])
696       elif request_type == DataManagerHandler.COLLECTION_PATH:
697         self.writeCollectionPage(split_path[1])
698       else:
699         self.send_error(404, 'Unrecognized request path.')
700     return
701
702 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
703   """Handle requests in a separate thread."""
704
705
706 if __name__ == '__main__':
707   args = parser.parse_args()
708
709   if args.port == 0:
710     loadAllData()
711   else:
712     loader = threading.Thread(target = loadAllData, name = 'loader')
713     loader.start()
714
715     server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
716     server.serve_forever()