""" """ # $Header: /cvsroot/dbagg2/lib/dbagg2/scan.py,v 1.11 2004/05/12 02:17:31 deusx Exp $ import sys, time, md5, feedparser, striphtml, traceback, random from mx.DateTime import * from dbagg2.config import * from dbagg2.model import * # TODO: Separate out data fetch (anticipate threaded replacement)? Need to get into feedparser guts for that? def scanSource(source): log = logging.getLogger("%s"%__name__) log.info("Scanning: (%s) %s" % (source.id, source.title)) # Fetch the source using feedparser source_data = feedparser.parse(source.url, source.etag, source.modified.tuple()) # Get HTTP status, with default try: http_status = int(source_data.get('status')) except: log.warn("Unknown HTTP status %s for source %s" % (source_data.get('status', 'None'), source.id)) http_status = 0 # Handle permanent redirect & sub change if http_status == 301: # Record a history entry log.warn("Source moved: (#%s) %s" % (source.id, source.title)) history = ScanHistory.new(source=source, scanTime=now(), newItems=-1, totalItems=-1, httpStatus=http_status, notes="Moved from %s to %s" % ( source.url, source_data['url'])) # Move the feed source.url = source_data['url'] return # Handle Gone & subsequent unsub if http_status == 410: log.warn("Source gone: (#%s) %s" % (source.id, source.title)) history = ScanHistory.new(source=source, scanTime=now(), newItems=-1, totalItems=-1, httpStatus=http_status, notes="Source gone from %s" % ( source.url )) # Deactivate the feed source.active = 0 # TODO: Offer a preference to actually delete source #Source.delete(id=source.id) return # Process the source's items try: new_item_count = processItems(source, source_data['items']) except: exc, e, tb = sys.exc_info() log.error("Unexpected error while processing items: %s / %s / %s" % (exc, e, "".join(traceback.format_tb(tb)))) # Record a history entry noting the exception history = ScanHistory.new(source=source, scanTime=now(), newItems=0, totalItems=0, httpStatus=http_status, notes="Unexpected error while processing items: %s" % \ sys.exc_info()[2]) return # Update scan info from source source.etag = source_data.get('etag', None) if source_data.has_key('modified'): source.modified = mktime(source_data['modified']) # AIMD update period tweaking: # TODO: Per-source mix/max and period tweaking constants # TODO: Per-source modular period tweaking algorithms (constant, AIMD, IM-driven) source.lastScan = now() if new_item_count > 0: # Ramp up on new items, remaining above min source.updatePeriod = max(min_update_period, source.updatePeriod * update_ramp_up) # Note that new items were found at this time. source.lastUpdate = now() else: # Back off on no new items, remaining under max source.updatePeriod = min(max_update_period, source.updatePeriod + update_back_off) # Schedule the next update source.nextScan = now() + Time(source.updatePeriod) # Record a history entry history = ScanHistory.new(source=source, scanTime=now(), newItems=new_item_count, totalItems=len(source_data['items']), httpStatus=http_status) def processItems(source, items): log = logging.getLogger("%s"%__name__) new_item_count = 0 for source_item in items: try: # Process only items with unseen uniq IDs uniq = makeItemUniq(source_item) item_set = Item.select(Item.q.uniq == uniq) if item_set.count() == 0: new_item = Item.new(source=source, uniq=uniq, created=now(), queued=None, # TODO: Use feed's dates instead of assuming scan time date=now(), link=source_item.get('link', None), title=source_item.get('title', None), description=source_item.get('description', '')) # Note that a new item was found. new_item_count += 1 log.debug("\tItem %s: %s" % (new_item.id, new_item.title)) except: exc, e, tb = sys.exc_info() log.error('Unexpected exception processing item: %s / %s / %s' % (exc, e, traceback.format_tb(tb))) return new_item_count def scanAllSources(): log = logging.getLogger("%s"%__name__) # Collect sources due for a scan, shuffle them up sources_for_scan = filter(lambda x: x.nextScan > now(), Source.select(Source.q.active == 1)) random.shuffle(sources_for_scan) for source in sources_for_scan: try: scanSource(source) except: exc, e, tb = sys.exc_info() log.error("Unexpected error while scanning source #%s %s: %s / %s / %s" % (source.id, source.title, exc, e, "".join(traceback.format_tb(tb)))) if __name__ == '__main__': scanAllSources()