Event sourcing on Rails with RabbitMQ
A year ago I wrote about one of our dashboard applications and how we solved performance issues using Faye delayed messaging (now available as gem!).
From the very beginning we started with Service Oriented Architecure, which turned out to be the best decision we ever made. Each application in the suite (there are 8 of them at the time of writing) is responsible for only one part of the business process. Then there is a dashboard that shows the user the most important information from all other apps.
Unfortunately, this is not everything. One of the most valuable features of this project is its ability to import/export data between applications. What’s more, it requires both write and read interfaces between applications – instead of star we have to deal with mesh.
The concurrent growth of features and users in the ecosystem has led to a number of performance issues. All the connections between applications were made using HTTP APIs in an “ask for X” manner and the suite started to become very sluggish.
We went through a thought process to redesign the architecture and reduce the dependencies, but since the core functionality of each application is separate and well-defined, there really wasn’t much that we could do to improve it. What’s more, we found that in the future it could be worthwhile to extract some functionality into new separate applications.
The obvious answer was this:
Let’s just toss in HTTP cache and it’ll be fine
It wasn’t.
The biggest problem we had with HTTP cache is max-age
.
There are two basic use cases in the system:
- User performs some action, goes to another application and wants to see updated data
- User goes to her dashboard and wants to see the current status of data (which might not have changed for days)
It is impossible to find a perfect number of seconds for how long the resource should be cached. If the time duration is too low there will be too many HTTP requests even if the data hasn’t changed. If it is too high we will get loads of emails from users informing us that the system is not working since they will be presented with outdated information.
The whole point of caching is to improve performance, not spoil the experience.
So what can we do?
The answer is: Revert the architecture
Message passing to the rescue
Object-Oriented Programming principles encourage Tell, Don’t Ask pattern. If we take it to the higher, architectural level, the solution makes itself.
Instead of the consumer asking the producer for a resource each time, let’s make the producer tell the consumer about changes.
Enough storytelling; it’s time to get our hands dirty and replace old-school HTTP APIs with awesome RabbitMQ messaging.
What is RabbitMQ?
RabbitMQ is an Open Source message broker / queueing system written in Erlang implementing AMQP. Their tag line states:
Messaging that just works
and I’ve found this to really be true.
To get an overview of what can be achieved using RabbitMQ see example topologies diagrams.
We will be using Pub-Sub topology with multiple fanout exchanges and queues.
Setting up RabbitMQ
This part depends on your operating system. There are number of guides on the RabbitMQ website.
If you happen to use Mac OS X and have Homebrew installed all you need to do is run
brew install rabbitmq
and then start it with
/usr/local/opt/rabbitmq/sbin/rabbitmq-server
After starting rabbitmq-server
you can access the Admin UI at http://localhost:15672. The default user is guest
with password guest
. The Admin UI is extremely useful when working with and debugging RabbitMQ. We’ll explore some of this in the following sections.
Simple architecture: Dashboard for a blog
Let’s imagine two applications: a typical Blog application with posts and a Dashboard application that displays 5 recently created posts. Instead of building an HTTP API in Blog so that Dashboard could ask
for recent posts we will make Blog tell
about each new post.
In the diagram above there is:
Blog
- a typical rails app backed by SQL databaseP
- RabbitMQ ProducerX
- RabbitMQ ExchangeQueue
- RabbitMQ QueueC
- RabbitMQ ConsumerDashboard
- rails app backed by Redis
After a post is created in Blog application it will go to Producer which will then send a message to Exchange. Exchange will put the message into a Queue. Then the Consumer, connected to this Queue, will grab the message and update Dashboard’s Redis-based cache.
This may sound a little complicated, but thanks to great ruby libraries you will notice that there is not too much work to do.
Blog post Publisher
First we need to create a basic scaffold for posts
rails new blog
cd blog
bundle
rails generate scaffold post title:string body:text
rake db:migrate
rails server
The interface for managing posts should be now available at http://localhost:3000/posts
Now we need to create a RabbitMQ Producer. Let’s call it Publisher. We will be using bunny - “A dead easy to use RabbitMQ Ruby client”.
Let’s put that into Gemfile
# blog/Gemfile
gem "bunny"
and run bundle install
.
Now it’s time to implement our Publisher.
# blog/app/services/publisher.rb
class Publisher
# In order to publish message we need a exchange name.
# Note that RabbitMQ does not care about the payload -
# we will be using JSON-encoded strings
def self.publish(exchange, message = {})
# grab the fanout exchange
x = channel.fanout("blog.#{exchange}")
# and simply publish message
x.publish(message.to_json)
end
def self.channel
@channel ||= connection.create_channel
end
# We are using default settings here
# The `Bunny.new(...)` is a place to
# put any specific RabbitMQ settings
# like host or port
def self.connection
@connection ||= Bunny.new.tap do |c|
c.start
end
end
end
Now we need to call Publisher.publish
every time a new Post
is created:
# blog/app/controllers/posts_controller.rb
class PostsController < ApplicationController
# ...
def create
@post = Post.new(post_params)
if @post.save
# Publish post data
Publisher.publish("posts", @post.attributes)
redirect_to @post, notice: 'Post was successfully created.'
else
render :new
end
end
# ...
end
That’s it!
Remember to restart rails server before continuing.
You can now create a new post, then go to RabbitMQ Admin UI, select “Exchanges”, then blog.posts
and you should see something like this:
When you scroll down the page you’ll notice that there is no binding for this exchange.
This basically means that messages sent to this exchange are not going anywhere.
Now it’s time to setup a queue between the exchange and Dashboard applications and the consumer that will update local cache.
Note on durability
RabbitMQ Pub/Sub tutorial uses on-demand random queues created when client connects to the server. This is good for some use cases, but not ours. In case the Dashboard applications goes down (for any reason) the temporary queue would be deleted and messages sent from Blog will never reach Dashboard. That’s why we need a static, durable queue that will hold messages in case Dashboard consumer disconnects and deliver every message after reconnecting.
Dashboard Consumer
If you are familiar with tools like Sidekiq or Resque this part will feel like home.
There is another great RabbitMQ ruby library made especially for processing messages that come from queues. It’s called sneakers and was created by @jondot. (You should definitely check out his blog!)
Let’s start with creating new rails app
rails new dashboard
cd dashboard
add some gems
# dashboard/Gemfile
gem 'redis-rails'
gem 'redis-namespace'
gem 'sneakers'
and run bundle install
.
Both redis and sneakers require some setup
Redis setup
# dashboard/config/initializers/redis.rb
$redis = Redis::Namespace.new("dashboard:#{Rails.env}", redis: Redis.new)
Sneakers setup
# dashboard/Rakefile
# load sneakers tasks
require 'sneakers/tasks'
Rails.application.load_tasks
# dashboard/config/initializers/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
Recent Posts service
Since we are not using ActiveRecord, we need some place to put functionality related to recent posts. Let’s make a service called RecentPosts
.
# app/services/recent_posts.rb
class RecentPosts
KEY = "recent_posts" # redis key
STORE_LIMIT = 5 # how many posts should be kept
# Get list of recent posts from redis
# Since redis stores data in binary text format
# we need to parse each list item as JSON
def self.list(limit = STORE_LIMIT)
$redis.lrange(KEY, 0, limit-1).map do |raw_post|
JSON.parse(raw_post).with_indifferent_access
end
end
# Push new post to list and trim it's size
# to limit required storage space
# `raw_post` is already a JSON string
# so there is no need to encode it as JSON
def self.push(raw_post)
$redis.lpush(KEY, raw_post)
$redis.ltrim(KEY, 0, STORE_LIMIT-1)
end
end
Dashboard view
Dashboard application needs to have some view so we can see if it works correctly.
# dashboard/app/controllers/home_controller.rb
class HomeController < ApplicationController
def index
@posts = RecentPosts.list
end
end
# dashboard/app/views/home/index.html.erb
<h2>Recently updated posts</h2>
<table>
<thead>
<tr>
<th>Title</th>
</tr>
</thead>
<tbody>
<% @posts.each do |post| %>
<tr>
<td><%= post[:title] %></td>
</tr>
<% end %>
</tbody>
</table>
# dashboard/config/routes.rb
Rails.application.routes.draw do
root to: "home#index"
end
Worker
Finally, the sneakers worker. You’ll probably notice that it looks very similar to sidekiq workers.
# dashboard/app/workers/posts_worker.rb
class PostsWorker
include Sneakers::Worker
# This worker will connect to "dashboard.posts" queue
# env is set to nil since by default the actuall queue name would be
# "dashboard.posts_development"
from_queue "dashboard.posts", env: nil
# work method receives message payload in raw format
# in our case it is JSON encoded string
# which we can pass to RecentPosts service without
# changes
def work(raw_post)
RecentPosts.push(raw_post)
ack! # we need to let queue know that message was received
end
end
That’s it, the Dashboard app is ready!
In order to start workers run:
WORKERS=PostsWorker rake sneakers:run
In RabbitMQ Admin UI you can see that the dashboard.posts
queue was created - navigate to “Queues” -> dashboard.posts
.
Now, if you create a new post in Blog app the message will go to blog.posts
exchange but the dashboard.posts
queue will still be empty. Why? We need to setup a binding between exchange and queue.
Putting it all together
We need to tell blog.posts
exchange to send incoming messages to dashboard.posts
queue. This could be done in RabbitMQ Admin UI, but it’s much better to have this binding as declarative configuration that can be automatically executed (e.g. during deploy).
We will use the same bunny library as we used for publishing messages in Blog application.
# config/Rakefile
namespace :rabbitmq do
desc "Setup routing"
task :setup do
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
# get or create exchange
x = ch.fanout("blog.posts")
# get or create queue (note the durable setting)
queue = ch.queue("dashboard.posts", durable: true)
# bind queue to exchange
queue.bind("blog.posts")
conn.close
end
end
Now all we need to do is run:
rake rabbitmq:setup
Let’s inspect blog.posts
exchange bindings in RabbitMQ Admin UI.
Now every Post
created will be published as a message that goes to blog.posts
exchange, is routed to dashboard.posts
queue, is taken by PostsWorker
, and is put into redis by RecentPosts
service.
Complex architectures
The previous example shows how you can connect two applications. In the real world, however, the connections are much more complicated.
Consider the following diagram
Here are some differences from the previous example:
Blog
now publishes many messages to many exchangesDashboard
consumes messages from many queuesAdmin
- another application that is both a producer and consumer- Exchange ↔ Queue binding is much more complicated - it can be either one to many (
blog.posts
) or many to one (*.page_views
) Admin
is a consumer backed by SQL instead of redis
Let’s go through some of those points
Cache storage
Choosing the appropriate storage for our cache is a very broad topic. I’m going to stick with just a few examples that will give you an overview.
Recent posts
On our Dashboard, we are only interested in the last five posts. This is a perfect use case for redis list. We atomically push new items to the front of the list using LPUSH
, we always fetch the first few items using LRANGE
, and we can easily limit the storage using LTRIM
. There is no need to do any sorting or filtering. Redis gives us everything that we need and nothing more.
Page views statistics
Another good use case for redis backend is gathering page views statistics. Using redis INCR
, INCRBY
or HINCR
it is easy to atomically increment counters without having to worry about race conditions. You can see an example of this at PageViews service.
Structured cache with filtering and sorting
Of course redis in not a silver bullet. There might be a case when you need the ability to filter or sort cached records. In such a case, we can create a regular SQL database model and store required information in it. See an example of storing blog posts in Admin app using Blog::Post model and Blog::PostsWorker
Exchange ↔ Queue bindings
Complex architectures will require more complex bindings. Fortunately, this can be solved in the same way as the previous example.
All we need to do is modify the setup task:
# config/Rakefile
namespace :rabbitmq do
desc "Setup routing"
task :setup do
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
# connect one exchange to multiple queues
x = ch.fanout("blog.posts")
ch.queue("dashboard.posts", durable: true).bind("blog.posts")
ch.queue("admin.posts", durable: true).bind("blog.posts")
# connect mutliple exchanges to the same queue
x = ch.fanout("admin.page_views")
ch.queue("dashboard.page_views", durable: true).bind("admin.page_views")
x = ch.fanout("blog.page_views")
ch.queue("dashboard.page_views", durable: true).bind("blog.page_views")
conn.close
end
end
Final notes
RabbitMQ has enormous potential for dealing with a complex Service-Oriented system. Using durable queues allows us to maintain data consistency even if one part of the system breaks. For Dashboard-like applications, users can still access cached data even if the underlying application goes down.
On-change message passing gives a significant performance boost when compared to continuous API calls. It is a win in terms of speed, durability and availability. Theoretically we give up some temporary consistency and real-time data updates. On most occasions, however, we’ve found that when a user moves between applications the cache is already updated and the recent data is already available.
A working example of complex architecture is available at our GitHub.
Looking for comments section?
Send me an email instead to [email protected]