Queues and Workers
In the last post, I briefly talked about the architecture of project I’m currently leading. We got a clear read/write separation in the architecture and for past few sprints we are pushing more & more work on the write path which made our write pipeline a bit heavy. Our challenge is to quickly process huge bulk of tags flowing through the write pipeline. Just to give you an idea of the numbers:
The business process involves doing an “all clothing” stock count every Monday between 06:00 – 09:00 AM. For our UK deployment, we are aiming 600 stores each containing an inventory of roughly 80k garments. To get good accuracy from RFID tags, all inventory must be counted at least twice. The usual process is, a group of people count the backroom and shop floor in parallel – they then swap and count again. So the math look like this:
80K * 600 = 48 million * 2 = 96 million tags
These needs to be processed within 3 hours which equates to roughly 10,000 commands/tags/second to our backend service.
We already spent quite a bit of time to optimize the inventory pipeline and 99th percentile latency is below 100ms which is reasonably good considering we are using NHibernate & Oracle and calling bunch of backend services. There is further juice we can extract out of inventory pipeline but realistically to process all this load, we need to scale out the system. We kind of knew this from day one, so we designed the system in a way where commands are pretty much queuable after some simple invariants checking.
We came with a very simple scale model to run multiple workers behind a set of queues - we started by re-hosting our domain in a worker process (a simple console application, the plan is to use NSSM in production). This simple model works great – workers compete for the commands, a single worker read a command under peak-lock and run the unit of work(UOW). If UOW cannot be committed, message is simply retried on another worker and in most cases the transient failure (a small race condition :-)) gets resolved on subsequent retries.
With this simple model, we were able to get a throughput of over 2000 tags/second using 16 workers on a single beefy machine.
There is huge number of duplicates in our scenario, so our next attempt was to detect/remove duplicate before they hit our workers. Ideally the messaging system should do this for us – We use Tibco EMS but unfortunately EMS doesn’t have any built de-duping functionality.
We also use Redis as our read store in our architecture – so we decided to build de-duping (on publish) functionality in Redis using the simple Get/Set operation. The results were awesome as we can de-dup a batch of 50 tags in 0.3ms.
This one change has significantly reduced the problem size for us as there are at least 100% duplicates in a stock count and there is no way to avoid them on the client/sender side. By efficiently de-duping them on server means our workers only has to process 1/2 of the load ~ 5000 tags/sec
Another interesting pattern we have seen is around large UOWs which becomes very in-efficient to be done as a single UOW synchronously. In these situations, a worker simple breaks the larger UOW into ‘N’ smaller UOWs which are queued and then processed in parallel. The downside here is that coding become bit tedious, as we are reading a message from the queue, breaking it down in smaller messages and writing them back in the queue. It’s not perfect, but it gives us a nice way to break & parallelize large UOWs (and we got plenty of these).