In my previous aggregator post, I set up Feed and Post models to capture the core information about these items. We can now store the information from the aggregation process in a persistent location, and this can be used by some external program to view the aggregation of content. Now we just need to get the information from somewhere...
The core aggregation logic is quite simple:
- We'll fetch the feed.
- We'll parse it.
- We'll save various bits of feed information to the database.
- For each entry in the feed, either create or update the information in the database.
There are a few optimisations we can do there for various levels of winnage. For example, a pretty big win is that there's no point parsing the feed if we can be sure the feed hasn't changed. An even bigger win given the rate of change of individual feeds on the Internet, is that there's no point fetching the full feed if it hasn't changed - use of eTag and If-Modified-Since can save us from not only unnecessary work, but unnecessary traffic (important in South Africa) and just being a good Internet citizen. A small win is that we don't need to update the database if the entry in the feed hasn't changed.
The Universal Feed Parser is about the best feed parser out there. It has literally thousands of unit tests to ensure it handles the largely varying brokenness of feeds out there, as well as providing a really simple API for handling various syndication formats, without hiding the extra information that more comprehensive syndication formats may provide.
It'll both fetch and parse the feed, so for now it does the first two steps.
The goal is that the aggregator itself will take care of the tedious part of the aggregation process - the bit that does the parsing and standard aggregator logic in terms of when to update feeds and posts and when to exit out early. The actual storage of the posts and feed will differ from project to project, but that core logic is mostly unchanged. I decided to use plain old inheritance for this (Bryn, of course, suggested generic functions...).
I've mostly used a genericised version of the collector parts from FeedJack (a Django-based aggregator) and some ideas from Planet (Planet Venus, to be precise), so thanks to Gustavo Picón, Scott James Remnant, Jeff Waugh, Sam Ruby, and others that contributed to those that I have stolen^Wborrowed from.
First, set up some options on initialisation - mostly setting up logging so that the calling code can get the sort of logging they'd like out of the application:
class Aggregator(object):
def __init__(self, options = None):
if not options:
options = {}
self.options = options
self.user_agent = options.get('user_agent', 'aggregate2db/0.1')
self.log = options.get('log', None)
self.verbose = options.get('verbose', False)
if not self.log:
self.log = logging.getLogger("Aggregator/%d" % (hash(self),))
self.log.propagate = False
if self.verbose:
self.log.level = logging.DEBUG
else:
self.log.level = logging.WARNING
self.log.addHandler(logging.StreamHandler(sys.stderr))
Then, the feed processing method is the (current) entry-point:
# Process a feed, updating the feed information and processing the
# entries in the feed.
#
# Returns boolean True or False on whether the feed was updated.
def process_feed(self, feed):
self.log.info('Processing feed: %s', feed.feed_url)
self.log.debug('Feed %s last checked at: %s',
feed.feed_url, feed.last_checked)
feed.last_checked = datetime.now()
feedparser_options = {
'agent': self.user_agent,
'etag': feed.etag,
'modified': feed.last_modified.timetuple(),
}
parsed_data = feedparser.parse(feed.feed_url, **feedparser_options)
if 'status' in parsed_data:
self.log.info('Feed %s status: %s', feed.feed_url, parsed_data.status)
# Hook for people to handle 301 (permanently moved) or 410
# (gone) response codes more effectively than just logging
# about it.
self.handleStatus(feed, parsed_data.status, parsed_data)
if parsed_data.status == 301:
self.log.warning('Feed %s permanently (301) moved to %s',
feed.feed_url, parsed_data.href)
if parsed_data.status == 302:
self.log.debug('Feed %s temporarily (302) moved to %s',
feed.feed_url, parsed_data.href)
if parsed_data.status == 304:
self.log.debug('Feed %s unchanged', feed.feed_url)
return False
if parsed_data.status == 410:
self.log.error('Feed %s has gone away (410)', feed.feed_url)
return False
if parsed_data.status >= 400:
self.log.warning('Feed %s has error status %s',
feed.feed_url, parsed_data.status)
return False
# Update etag so that we don't have to download anything if we don't need to
feed.etag = parsed_data.get('etag', '')
feed.title = parsed_data.feed.get('title', '')
feed.description = parsed_data.feed.get('tagline', '')
feed.link = parsed_data.feed.get('link', '')
self.log.debug('Feed title: %s', feed.title)
self.log.debug('Feed description: %s', feed.description)
self.log.debug('Feed link: %s', feed.link)
# Get all the existing posts mentioned out of the database -
# doing this in one go could be quite a bit more efficient than
# doing it one at a time.
posts = self.getExistingPostsForEntries(feed, parsed_data.entries)
# Keep track of whether we've had any updates - that affects
# whether we should forcibly updated last_modified if the feed
# doesn't provide any last_modified data.
updated = False
for entry in parsed_data.entries:
try:
entry_updated = self.process_entry(feed, entry, posts, parsed_data)
updated = updated or entry_updated
except:
self.log.warning("Entry %s could not be processed", entry.link)
# TODO: Hard to debug errors in entry_updated if we just
# swallow this error - need to have option to reraise
# when in development
# Figure out from the data when the feed was last modified,
# potentially just using the feed last modified time, but maybe
# checking individual entries in the list, or just making up a
# time.
last_modified = self.determineFeedLastModified(feed, parsed_data, updated)
if last_modified:
feed.last_modified = last_modified
feed.save()
return updated
Then, for each entry found in the feed processing, the process_entry method is called:
# Process a feed entry, creating or updating the entry information
#
# Returns boolean True or False on whether the entry was created/updated.
def process_entry(self, feed, entry, posts, parsed_data):
# This maps the entry data into a dictionary with keys the same
# name as the attributes of a post object.
postdata = self.get_entry_data(entry, parsed_data, feed)
self.log.info('Considering entry: %s', postdata['link'])
self.log.debug('%s - title: %s', postdata['link'],
postdata['title'])
self.log.debug('%s - guid: %s', postdata['link'],
postdata['guid'])
# If the post already exists, look it up from the pre-populated
# posts using the guid.
if postdata['guid'] in posts:
post = posts[postdata['guid']]
# entryChanged is a hook for potential customisation, as
# the default method may not always be the most accurate for
# a particular set of feeds. Generally, will check modified
# time, if it exists, or compare the actual content.
if not self.entryChanged(post, postdata, parsed_data):
self.log.info("Entry %s exists, and is unchanged",
postdata['link'])
return False
self.log.info("Entry %s exists, but is changed, updating",
postdata['link'])
if not postdata['date_modified']:
postdata['date_modified'] = post.date_modified
for k, v in postdata.items():
setattr(post, k, v)
else:
self.log.info("Entry %s is a new entry", postdata['link'])
# Determine when this post was actually created - which, if
# it is not explicitly in the entry, requires checking the
# feed and/or headers or just putting the current time if
# there's no better indicator
postdata['date_modified'] = self.determineEntryLastModified(feed,
postdata, entry, parsed_data)
post = self.createPost(**postdata)
post.save()
# Hook for further processing of the entry - tags, updating
# search data, &c.
self.furtherProcessEntry(post, entry, parsed_data, feed, posts)
# Entry was changed
return True
There are a few calls to other methods on the object. This serves to keep the functions pretty short as well as allowing alternate implementations to be used. The model-creation/fetching methods (createPost and getPostsForGuids) also need to be provided to persist and retrieve objects. These should always be pretty small - here's for the models from last time:
def createPost(self, **kw):
post = Post(**kw)
post.save()
return post
def getPostsForGuids(self, feed, guids):
for guid in guids:
posts = Post.select((Post.c.feed_id == feed.feed_id) &
(Post.c.guid == guid))
if not len(posts):
continue
yield posts[0]
In my next post, I'll have a download of the code in a usable form (still trying to figure more elegant ways to , and explore adding some extra features like tags and searching using furtherProcessEntry.
Feedparser is indeed quite neat, but I do have a problem or two with it. It would be really nice if it were a bit more lazy about parsing the feeds. Instead of just parsing the entire feed and returning a UserDict it'd be nice if it would return a generator for at least entries. That way it could incrementally parse the item entries in a post so that if you only need to handle and allocate memory for entries that you don't already have in your database. This is especially nice for when you have only a fixed (small) amount of memory you can use.
Why do you opt for options=None in the __init__ signature of the Aggregator class? Isn't this a great time to use **kwargs?
Would save you some lines of code and look a bit more elegant (at least to my eyes).
Thanks for the comments, Alexander. You're quite right on the options thing - I'm not really sure how I managed to miss using kwargs...
Yeah, everyone makes silly mistakes, thats why we should all share our code so we can fix them instead of hiding them behind shrink wrap :)
I notice you're doing a join with sqlalchemy by giving the join parameters (Post.c.feed_id == Feed.c.feed_id) you should checkout one of two things:
join(Klass1.table, Klass2.table).select() or select(and_(Klass1.join_to('backref_to_klass2'), Klass1.c.name == 'fred'))
The latter is really useful if you have a has_and_belongs_to_many() relationship since you don't have to know the names for the intermediary table that is used to do that relationship.
I'm just spamming you all night tonight :/
Anyway, really excited to see what comes of this project; I've recently worked on an aggregator myself (with a very different purpose though) so would like to see a different take on it.
Thanks again Alexander. In this case I'm just using the value in feed.feed_id, but I still learnt something from what you said about join_to. I can see a few places now where I can use it to make my life a lot simpler...
Another interesting article - thanks a lot. But since you're on the topic of RSS feeds, did you know that your feed doesn't look right in Google Reader? For whatever reason, it displays the HTML source instead of the rendered output. Perhaps the warnings from http://feedvalidator.org might give a clue?
I'm only pointing this out because I enjoy the blog. Keep up the good work :-)
Simon, I've improved the Atom feed, and it now passes [feedvalidator.org] the Feed Validator [feedvalidator.org]. Thanks for the notice and comment.
...and it looks great in Google Reader too. Thanks a lot!