""" scraperlib Useful base classes and utilities for HTML page scrapers. """ import sys, time, re, shelve, popen2, calendar, md5 from urllib import quote from urllib2 import urlopen from urlparse import urljoin, urlparse from xml.sax.saxutils import escape from HTMLParser import HTMLParser, HTMLParseError UNICODE_ENC = "UTF-8" class FeedEntryDict: """ This class is a wrapper around HTMLMetaDoc objects meant to facilitate easy use in XML template strings. """ UNICODE_ENC = "UTF-8" DATE_KEYS = [ 'modified', 'issued' ] def __init__(self, init_dict={}, date_fmt='%Y-%m-%dT%H:%M:%SZ'): """ Initialize the feed entry dict, with optional data. """ self.data = {} self.data.update(init_dict) self.date_fmt = date_fmt def __cmp__(self, other): """Reverse chronological order on modified date""" return cmp(other.data['modified'], self.data['modified']) def __setitem__(self, name, val): """Set a value in the feed entry dict.""" self.data[name] = val def __getitem__(self, name): """Return a dict item, escaped and encoded for XML inclusion""" # Chop off the entry. prefix, if found. if name.startswith('entry.'): name = name[6:] # If this key is a date, format accordingly. if name in self.DATE_KEYS: date = self.data.get(name, time.time()) val = time.strftime(self.date_fmt, time.gmtime(date)) # Otherwise, try returning what was asked for. else: val = self.data.get(name, '') # Make sure the value is finally safe for inclusion in XML if type(val) is unicode: val = val.encode(self.UNICODE_ENC) return escape(val.strip()) def id(self): """Come up with a state DB ID for this entry.""" # Try to use entry GUID, first. id = self['id'] # If no GUID available, hash entry contents. if not len(id) > 0: m = md5.md5() for v in self.data.values(): if type(v) is unicode: v = v.encode(self.UNICODE_ENC) m.update('%s' % v) id = m.hexdigest() return id class _ScraperFinishedException(Exception): """ Private exception, raised when the scraper has seen all it's interested in parsing. """ pass ATOM_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" ATOM_FEED_TMPL = """ %(feed.title)s %(feed.tagline)s %(feed.modified)s %(feed.author.name)s %(feed.author.email)s %(feed.author.url)s %(feed.entries)s """ ATOM_ENTRY_TMPL = """ %(entry.title)s %(entry.issued)s %(entry.modified)s %(entry.id)s %(entry.summary)s """ RSS_DATE_FMT = "%a, %d %b %Y %H:%M:%S %z" RSS_FEED_TMPL = """ %(feed.title)s %(feed.link)s %(feed.tagline)s %(feed.author.email)s %(feed.entries)s """ RSS_ENTRY_TMPL = """ %(entry.title)s %(entry.link)s %(entry.modified)s %(entry.id)s %(entry.summary)s """ class Scraper: """ class containing a few methods universal to scrapers. """ UNICODE_ENC = "UTF-8" FEED_META = { 'feed.title' : 'A Sample Feed', 'feed.link' : 'http://www.example.com', 'feed.tagline' : 'This is a testing sample feed.', 'feed.author.name' : 'l.m.orchard', 'feed.author.email' : 'l.m.orchard@pobox.com', 'feed.author.url' : 'http://www.decafbad.com', 'feed.modified' : '' } SORT_ENTRIES = True MAX_ENTRIES = 15 BASE_HREF = "" SCRAPE_URL = "" ATOM_DATE_FMT = ATOM_DATE_FMT ATOM_FEED_TMPL = ATOM_FEED_TMPL ATOM_ENTRY_TMPL = ATOM_ENTRY_TMPL RSS_DATE_FMT = RSS_DATE_FMT RSS_FEED_TMPL = RSS_FEED_TMPL RSS_ENTRY_TMPL = RSS_ENTRY_TMPL def scrape_atom(self): """Scrape the page and return an Atom feed.""" self.FEED_META['feed.modified'] = \ time.strftime(self.ATOM_DATE_FMT, time.gmtime(time.time())) return self.scrape(self.ATOM_ENTRY_TMPL, self.ATOM_FEED_TMPL, self.ATOM_DATE_FMT) def scrape_rss(self): """Scrape the page and return an RSS feed.""" return self.scrape(self.RSS_ENTRY_TMPL, self.RSS_FEED_TMPL, self.RSS_DATE_FMT) def scrape(self, entry_tmpl, feed_tmpl, date_fmt): """ Given an entry and feed string templates, scrape an HTML page for content and use the templates to return a feed. """ self.date_fmt = date_fmt self.state_db = shelve.open(self.STATE_FN) # Scrape the source data for FeedEntryDict instances entries = self.produce_entries() # Make a polishing-up run through the extracted entries. for e in entries: # Come up with ID for state db state_id = e.id() # Make sure the entry link is absolute if e.data.has_key('link'): e['link'] = urljoin(self.BASE_HREF, e['link']) # Try to get state for this ID, creating a new record # if needed. if not self.state_db.has_key(state_id): self.state_db[state_id] = {} entry_state = self.state_db[state_id] # Manage remembered values for datestamps when entry data # first found, unless dates were extracted. if e.data.get('modified','') == '': if not entry_state.has_key('modified'): entry_state['modified'] = time.time() e.data['modified'] = entry_state['modified'] if e.data.get('issued','') == '': e.data['issued'] = e.data['modified'] for n in ('issued', 'modified'): if e.data.get(n, '') != '': continue # Construct a canonical tag URI for the entry if none set if not len(e.data.get('id', '')) > 0: (scheme, addr, path, params, query, frag) = \ urlparse(e['link']) now = e.data.has_key('modified') and time.gmtime(e.data['modified']) or time.gmtime() ymd = time.strftime("%Y-%m-%d", now) e['id'] = "tag:%s,%s:%s" % (addr, ymd, quote(path,'')) # Update the state database record self.state_db[state_id] = entry_state # Close the state database self.state_db.close() # Sort the entries, now that they all should have dates. if self.SORT_ENTRIES: entries.sort() # Build the entries from template, and populate the feed data entries_out = [entry_tmpl % e for e in entries[:self.MAX_ENTRIES]] feed = { 'feed.entries' : "\n".join(entries_out) } # Add all the feed metadata into the feed, ensuring # Unicode encoding happens. for k, v in self.FEED_META.items(): if type(v) is unicode: v = v.encode(self.UNICODE_ENC) feed[k] = v # Return the built feed return feed_tmpl % feed class RegexScraper(Scraper): """ Base class for regex-based feed scrapers. """ # Default regex extracts all hyperlinks. ENTRY_RE = """(?P(?P.*?)</a>)""" def __init__(self): """Initialize the scraper, compile the regex""" self.entry_re = re.compile(self.ENTRY_RE, re.DOTALL | re.MULTILINE | re.IGNORECASE) def produce_entries(self): """Use regex to extract entries from source""" # Fetch the source for scraping. src = urlopen(self.SCRAPE_URL).read() # Iterate through all the matches of the regex found. entries, pos = [], 0 while True: # Find the latest match, stop if none found. m = self.entry_re.search(src, pos) if not m: break # Advance the search position to end of previous search. pos = m.end() # Create and append the FeedEntryDict for this extraction. entries.append(FeedEntryDict(m.groupdict(), self.date_fmt)) return entries from tidylib import tidy_string from Ft.Xml.Domlette import NonvalidatingReader class XPathScraper(Scraper): """ Base class for XPath-based feed scrapers. """ NSS = { 'xhtml':'http://www.w3.org/1999/xhtml' } # Default xpaths extract all hyperlinks. ENTRIES_XPATH = "//xhtml:a" ENTRY_XPATHS = { 'title' : './text()', 'link' : './@href', 'summary' : './text()' } def produce_entries(self): """Use xpaths to extract feed entries and entry attributes.""" # Fetch the HTML source, tidy it up, parse it. src = urlopen(self.SCRAPE_URL).read() tidy_src = tidy_string(src) doc = NonvalidatingReader.parseString(tidy_src, self.SCRAPE_URL) entries = [] # Iterate through the parts identified as feed entry nodes. for entry_node in doc.xpath(self.ENTRIES_XPATH, self.NSS): # For each entry attribute path, attempt to extract the value data = {} for k,v in self.ENTRY_XPATHS.items(): nodes = entry_node.xpath(v, self.NSS) vals = [x.nodeValue for x in nodes if x.nodeValue] data[k] = " ".join(vals) # Create and append the FeedEntryDict for this extraction entries.append(FeedEntryDict(data, self.date_fmt)) return entries class HTMLScraper(HTMLParser, Scraper): """ Base class for HTMLParser-based feed scrapers. """ CHUNKSIZE = 1024 def produce_entries(self): fin = urlopen(self.SCRAPE_URL) return self.parse_file(fin) def reset(self): """Initialize the parser state.""" HTMLParser.reset(self) self.feed_entries = [] self.in_feed = False self.in_entry = False self.curr_data = '' def start_feed(self): """Handle start of feed scraping""" self.in_feed = True def end_feed(self): """Handle end of all useful feed scraping.""" raise _ScraperFinishedException() def start_feed_entry(self): """Handle start of feed entry scraping""" self.curr_entry = FeedEntryDict({}, self.date_fmt) self.in_entry = True def end_feed_entry(self): """Handle the detected end of a feed entry scraped""" self.feed_entries.append(self.curr_entry) self.in_entry = False def handle_data(self, data): self.curr_data += data def handle_entityref(self, data): self.curr_data += '&' + data + ';' handle_charref = handle_entityref def decode_entities(self, data): data = data.replace('<', '<') data = data.replace('>', '>') data = data.replace('"', '"') data = data.replace(''', "'") data = data.replace('&', '&') return data def parse_file(self, fin): """Parse through the contents of a given file-like object.""" self.reset() while True: try: data = fin.read(self.CHUNKSIZE) if len(data) == 0: break self.feed(data) except _ScraperFinishedException: break return self.feed_entries