blog tumblog github @santosh79
August 7th 2011

Building a RSS feed processor with Redis

Redis is great for a lot of things such as: acting as a cache, storing your app’s configuration etc. In this post, I am going to use Redis' atomic and blocking facilities to build a multi-step RSS feed processor. Along the way, some of the topics I hope to touch upon are: queue priorization, synchronization between processes, using redis to gracefully shutdown processes and a few race conditions to watch out for.

Step 1: Getting a basic RSS feed processor up and running

This is what the RSS feed processor is going to try to do:

  • 1: It’s going to have a list of Feed URL’s it’s going to have hit periodically and get new feeds from.
  • 2: When it find’s a new feed it’s going to process it. For now, let’s just assume processing it means, storing it in redis somewhere.

Getting step 1 running is fairly simple. I’m going to be using the ruby stdlib’s rss library for doing the RSS feed processing. In terms of organizing my redis datastructure’s I’ll use a list called feeds_to_fetch which I will be using to pop URL’s from and determine which needs to fetched next. I’ll also use another list called entries_needing_processing into which I’ll stick RSS entries that need to be processed. So, given that short intro here is how a first pass at building this thing looks:

URLS = %W{http://www.npr.org/rss/rss.php?id=1001
          http://www.npr.org/rss/rss.php?id=100}

redis = Redis.new

URLS.each { |url| redis.rpush "feeds_to_fetch", url }

loop do
  queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0
  if queue == "feeds_to_fetch"
    # Fetch feeds
    feed_url = payload
    puts "fetching feed: #{feed_url}"
    content = open(feed_url) { |s| s.read }
    rss = RSS::Parser.parse content, false
    rss.items.each do |entry|
      redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
    end
    redis.rpush "feeds_to_fetch", feed_url
  else
    # Process entries
    entry = Marshal.load payload
    puts "processing entry: #{entry.url}"
    entry_id = redis.incr "entries_processed"
    redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, 
                "published", entry.published, "description", entry.summary
    redis.sadd "entry_ids", entry_id
  end
end

So this code looks simple enough. There are a bunch of URL’s that are dumped onto the feeds_to_fetch list. We then infinitely loop, doing a BLPOP over multiple lists (feeds_to_fetch and entries_needing_processing). From the BLPOP we branch into two code paths:

  1. If we just popped something from the feeds_to_fetch list, we know the value we just popped is a RSS feed url and we proceed with getting it’s feed. Once we are done fetching the feed, we push the entries onto the entries_needing_processing list.
  2. On the other hand, if we popped something from the entries_needing_processing list, we know it’s a RSS entry and we go ahead and process it, i.e. store it in Redis, for now.

While this code looks like it should work, when we run it we see that all we are doing is fetching feed_url’s and not processing any of our entries. Here is a sample output from my run:

fetching feed: http://www.npr.org/rss/rss.php?id=1008
fetching feed: http://www.npr.org/rss/rss.php?id=100
fetching feed: http://www.npr.org/rss/rss.php?id=1001
fetching feed: http://www.npr.org/rss/rss.php?id=1006

As you can see, all we are doing is fetching feeds with no processing of entries. Welcome to the first race condition!

Prioritization of Queues in BLPOP

To re-iterate the behavior we are seeing is that we are constantly fetching feed’s and not processing any entries.Why is this happening? In order to understand why, we need to pay closer attention to the following line:

  queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0

What we are doing here is telling redis to pop an item off either the feeds_to_fetch or the entries_needing_processing lists in that order (the trailing 0 argument just means block forever). To further clarify, we are telling redis try giving me something from the feeds_to_fetch list, if you don’t find anything there then try giving me something from the entries_needing_processing list. So, we are giving the feeds_to_fetch list a higher order of priority than the entries_needing_processing list by specifying it before. This combined with the fact that, when we pop an item off the feeds_to_fetch list we are pushing it back means that we never are going to ever get to popping an item off the entries_needing_processing list. How do we fix this?

One solution is to not push the feed_url back onto the feeds_to_fetch list immediately. Rather wait, till we process all it’s entries and then stick it back on. While this would work, it makes our code a little more complicated. We now need to keep track of when all the entries of a given feed_url are processed etc. A simpler far more elegant solution is to just switch the priorities of list in the BLPOP to:

  queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch",  0

By pushing the entries_needing_processing higher up in the order of fetching we are guaranteeing that we will be processing all entries and fetching all feeds. The big reason why this eliminates race conditions is because we are not pushing an entry back onto the entries_needing_processing list after popping it.

Parallelizing the feed fetch

So now we have a basic feed fetching script in place and we can squeeze some performance out of it. One of the lowest hanging fruits we can optimize is increasing the number of processes that pull stuff from our redis lists. With that in mind a preliminary attempt looks as follows:

NUMBER_OF_WORKERS = 4
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch", 0
      if queue == "feeds_to_fetch"
        # Fetch feeds
        feed_url = payload
        puts "fetching feed: #{feed_url}"
        content = open(feed_url) { |s| s.read }
        rss = RSS::Parser.parse content, false
        rss.items.each do |entry|
          redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
        end
        redis.rpush "feeds_to_fetch", feed_url
      else
        # Process entries
        entry = Marshal.load payload
        puts "processing entry: #{entry.url}"
        entry_id = redis.incr "entries_processed"
        redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, 
                    "published", entry.published, "description", entry.summary
        redis.sadd "entry_ids", entry_id
      end
    end
  end
end

Process.waitall

While this code works great, we see we have no way of gracefully bringing down our workers.

Graceful shutdown of workers

The next task we need to solve is have some way of being able to shutdown all the worker processes and the parent process after our script starts running. We could use something like signals for this, but having the parent process tell the child to kill themselves is something redis would be much better at doing.

NUMBER_OF_WORKERS = 4
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do

      queue, payload = redis.blpop "message_from_master", "entries_needing_processing", 
                                    "feeds_to_fetch", 0
      exit(0) if queue == "message_from_master" && payload == "DIE!"
      # rest of worker code
    end
  end
end

`echo #{Process.pid} > /tmp/feed_processor.pid`
puts "Parent process wrote PID to /tmp/loyalize_master.pid"

trap('QUIT') do
  NUMBER_OF_WORKERS.times do
    redis.lpush "message_from_master", "DIE!"
  end
end

Process.waitall

The points of interest in the change here are marking message_from_master the first list in the order of priority. This way, when a worker is done processing either a feed or entry and is ready to fetch it’s next task from redis, a message from master (aka the parent process) takes highest priority. In the parent process itself, we write out the PID to a file in the /tmp dir and trap the QUIT signal. In the signal handler, the parent just shoves down a DIE message on to the message_from_master queue, as many times as the number of worker processes it has spawned. And since a BLPOP is atomic we are guaranteed that each child is not going to pull out more than 1 of those DIE messages, so this way they all get to see this message. Race conditions goodbye! To see this in action and send the QUIT signal to the parent process we just need to run:

kill -s QUIT `cat /tmp/feed_processor.pid`

Protecting from the vagaries of the internet

We are getting real close to being done here. One thing we can do is try hardening the script just a teeny bit by protecting the feed fetching portion of the script by wrapping it in a begin-ensure clause.

if queue == "feeds_to_fetch"
  # Fetch feeds
  feed_url = payload
  begin
    puts "fetching feed: #{feed_url}"
    content = open(feed_url) { |s| s.read }
    rss = RSS::Parser.parse content, false
    rss.items.each do |entry|
      redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
    end
  ensure
    redis.rpush "feeds_to_fetch", feed_url
  end
else

Conclusion

While this is by no means a bullet-proof script it is an interesting exercise. It makes obvious just how big a role redis plays in an app where it’s your primary or only datastore. Stuff like synchronization, mutexes, resource-contention, prioritized allocation of resources are all taken care of for you by redis. The full Gist is up. Enjoy!

blog comments powered by Disqus