In my previous aggregator post, I set up Feed and Post models to capture the core information about these items.  We can now store the information from the aggregation process in a persistent location, and this can be used by some external program to view the aggregation of content.  Now we just need to get the information from somewhere...

The core aggregation logic is quite simple:

  1. We'll fetch the feed.
  2. We'll parse it.
  3. We'll save various bits of feed information to the database.
  4. For each entry in the feed, either create or update the information in the database.

There are a few optimisations we can do there for various levels of winnage.  For example, a pretty big win is that there's no point parsing the feed if we can be sure the feed hasn't changed.  An even bigger win given the rate of change of individual feeds on the Internet, is that there's no point fetching the full feed if it hasn't changed - use of eTag and If-Modified-Since can save us from not only unnecessary work, but unnecessary traffic (important in South Africa) and just being a good Internet citizen.  A small win is that we don't need to update the database if the entry in the feed hasn't changed.


The Universal Feed Parser is about the best feed parser out there.  It has literally thousands of unit tests to ensure it handles the largely varying brokenness of feeds out there, as well as providing a really simple API for handling various syndication formats, without hiding the extra information that more comprehensive syndication formats may provide.

It'll both fetch and parse the feed, so for now it does the first two steps.

The goal is that the aggregator itself will take care of the tedious part of the aggregation process - the bit that does the parsing and standard aggregator logic in terms of when to update feeds and posts and when to exit out early.  The actual storage of the posts and feed will differ from project to project, but that core logic is mostly unchanged.  I decided to use plain old inheritance for this (Bryn, of course, suggested generic functions...).

I've mostly used a genericised version of the collector parts from FeedJack (a Django-based aggregator) and some ideas from Planet (Planet Venus, to be precise), so thanks to Gustavo Picón, Scott James Remnant, Jeff Waugh, Sam Ruby, and others that contributed to those that I have stolen^Wborrowed from.

First, set up some options on initialisation - mostly setting up logging so that the calling code can get the sort of logging they'd like out of the application:

class Aggregator(object):
    def __init__(self, options = None):
        if not options:
            options = {}
        self.options = options
        self.user_agent = options.get('user_agent', 'aggregate2db/0.1')
        self.log = options.get('log', None)
        self.verbose = options.get('verbose', False)
        if not self.log:
            self.log = logging.getLogger("Aggregator/%d" % (hash(self),))
            self.log.propagate = False
            if self.verbose:
                self.log.level = logging.DEBUG
            else:
                self.log.level = logging.WARNING
            self.log.addHandler(logging.StreamHandler(sys.stderr))

Then, the feed processing method is the (current) entry-point:

    # Process a feed, updating the feed information and processing the
    # entries in the feed.
    #
    # Returns boolean True or False on whether the feed was updated.
    def process_feed(self, feed):
        self.log.info('Processing feed: %s', feed.feed_url)
        self.log.debug('Feed %s last checked at: %s',
            feed.feed_url, feed.last_checked)
        feed.last_checked = datetime.now()

        feedparser_options = {
            'agent': self.user_agent,
            'etag': feed.etag,
            'modified': feed.last_modified.timetuple(),
        }

        parsed_data = feedparser.parse(feed.feed_url, **feedparser_options)

        if 'status' in parsed_data:
            self.log.info('Feed %s status: %s', feed.feed_url, parsed_data.status)

            # Hook for people to handle 301 (permanently moved) or 410
            # (gone) response codes more effectively than just logging
            # about it.
            self.handleStatus(feed, parsed_data.status, parsed_data)

            if parsed_data.status == 301:
                self.log.warning('Feed %s permanently (301) moved to %s',
                    feed.feed_url, parsed_data.href)

            if parsed_data.status == 302:
                self.log.debug('Feed %s temporarily (302) moved to %s',
                    feed.feed_url, parsed_data.href)

            if parsed_data.status == 304:
                self.log.debug('Feed %s unchanged', feed.feed_url)
                return False

            if parsed_data.status == 410:
                self.log.error('Feed %s has gone away (410)', feed.feed_url)
                return False

            if parsed_data.status >= 400:
                self.log.warning('Feed %s has error status %s',
                    feed.feed_url, parsed_data.status)
                return False

        # Update etag so that we don't have to download anything if we don't need to
        feed.etag = parsed_data.get('etag', '')

        feed.title = parsed_data.feed.get('title', '')
        feed.description = parsed_data.feed.get('tagline', '')
        feed.link = parsed_data.feed.get('link', '')

        self.log.debug('Feed title: %s', feed.title)
        self.log.debug('Feed description: %s', feed.description)
        self.log.debug('Feed link: %s', feed.link)

        # Get all the existing posts mentioned out of the database -
        # doing this in one go could be quite a bit more efficient than
        # doing it one at a time.
        posts = self.getExistingPostsForEntries(feed, parsed_data.entries)

        # Keep track of whether we've had any updates - that affects
        # whether we should forcibly updated last_modified if the feed
        # doesn't provide any last_modified data.
        updated = False
        for entry in parsed_data.entries:
            try:
                entry_updated = self.process_entry(feed, entry, posts, parsed_data)
                updated = updated or entry_updated
            except:
                self.log.warning("Entry %s could not be processed", entry.link)
                # TODO: Hard to debug errors in entry_updated if we just
                # swallow this error - need to have option to reraise
                # when in development

        # Figure out from the data when the feed was last modified,
        # potentially just using the feed last modified time, but maybe
        # checking individual entries in the list, or just making up a
        # time.
        last_modified = self.determineFeedLastModified(feed, parsed_data, updated)

        if last_modified:
            feed.last_modified = last_modified

        feed.save()

        return updated

Then, for each entry found in the feed processing, the process_entry method is called:

    # Process a feed entry, creating or updating the entry information
    #
    # Returns boolean True or False on whether the entry was created/updated.
    def process_entry(self, feed, entry, posts, parsed_data):
        # This maps the entry data into a dictionary with keys the same
        # name as the attributes of a post object.
        postdata = self.get_entry_data(entry, parsed_data, feed)

        self.log.info('Considering entry: %s', postdata['link'])
        self.log.debug('%s - title: %s', postdata['link'],
            postdata['title'])
        self.log.debug('%s - guid: %s', postdata['link'],
            postdata['guid'])

        # If the post already exists, look it up from the pre-populated
        # posts using the guid.
        if postdata['guid'] in posts:
            post = posts[postdata['guid']]

            # entryChanged is a hook for potential customisation, as
            # the default method may not always be the most accurate for
            # a particular set of feeds.  Generally, will check modified
            # time, if it exists, or compare the actual content.
            if not self.entryChanged(post, postdata, parsed_data):
                self.log.info("Entry %s exists, and is unchanged",
                    postdata['link'])
                return False

            self.log.info("Entry %s exists, but is changed, updating",
                postdata['link'])

            if not postdata['date_modified']:
                postdata['date_modified'] = post.date_modified

            for k, v in postdata.items():
                setattr(post, k, v)

        else:
            self.log.info("Entry %s is a new entry", postdata['link'])

            # Determine when this post was actually created - which, if
            # it is not explicitly in the entry, requires checking the
            # feed and/or headers or just putting the current time if
            # there's no better indicator
            postdata['date_modified'] = self.determineEntryLastModified(feed,
                postdata, entry, parsed_data)

            post = self.createPost(**postdata)

        post.save()

        # Hook for further processing of the entry - tags, updating
        # search data, &c.
        self.furtherProcessEntry(post, entry, parsed_data, feed, posts)

        # Entry was changed
        return True

There are a few calls to other methods on the object.  This serves to keep the functions pretty short as well as allowing alternate implementations to be used.  The model-creation/fetching methods (createPost and getPostsForGuids) also need to be provided to persist and retrieve objects.  These should always be pretty small - here's for the models from last time:

    def createPost(self, **kw):
        post = Post(**kw)
        post.save()
        return post

    def getPostsForGuids(self, feed, guids):
        for guid in guids:
            posts = Post.select((Post.c.feed_id == feed.feed_id) &
                (Post.c.guid == guid))
            if not len(posts):
                continue
            yield posts[0]

In my next post, I'll have a download of the code in a usable form (still trying to figure more elegant ways to , and explore adding some extra features like tags and searching using furtherProcessEntry.