June 13, 2014

Fast Storage with RocksDB C++, Thrift & RocksDB

In November last year, I started developing an infrastructure that would allow Wingify to collect, store, search and retrieve high volume data. The idea was to collect all the URLs on which Wingify’s homegrown CDN would serve JS content. Based on Wingify’s current traffic, we were looking to collect some 10k URLs per second across four major geographic regions where Wingify runs their servers.

In the beginning I tried MySQL, Redis, Riak, CouchDB, MongoDB, ElasticSearch but nothing worked out for me with that kind of high speed writes. I also wanted the system to respond very quickly, under 40ms between internal servers on private network. This post talks about how I was able to make such a system using C++11, RocksDB and Thrift.

First, let me start by sharing the use cases of such a system in VWO; the following screenshot shows a feature where users can enter a URL to check if VWO Smart Code was installed on it.

VWO Smart Code checker

The following screenshot shows another feature where users can see a list of URLs matching a complex wildcard pattern, regex pattern, string rule etc. while creating a campaign.

VWO URL Matching Helper

I reviewed several opensource databases but none of them would fit Wingify’s requirements except Cassandra. In clustered deployment, reads from Cassandra were too slow and slower when data size would grew. After understanding how Cassandra worked under the hood such as its log structured storage like LevelDB I started playing with opensource embeddable databases that would use similar approach such as LevelDB and Kyoto Cabinet. At the time, I found an embedabble persistent key-value store library built on LevelDB called RocksDB. It was opensourced by Facebook and had a fairly active developer community so I started playing with it. I read the project wiki, wrote some working code and joined their Facebook group to ask questions around prefix lookup. The community was helpful, especially Igor and Siying who gave me enough hints around prefix lookup, using custom extractors and bloom filters which helped me write something that actually worked in the production environment for the first time. Explaining the technology and jargons is out of scope of this post but I would like to encourage the readers to read about LevelDB and to read the RocksDB wiki.

RocksDB FB Group

For capturing the URLs with peak velocity up to 10k serves/s, I reused Wingify’s distributed queue based infrastructure. For storage, search and retrieval of URLs I wrote a custom datastore service using C++, RocksDB and Thrift called HarvestDB. Thrift provided the RPC mechanism for implementing this system as a distributed service accessible by various backend sub-systems. The backend sub-systems use client libraries generated by Thrift compiler for communicating with the HarvestDB server.

The HarvestDB service implements five remote procedures - ping, get, put, search and purge. The following Thrift IDL describes this service.

namespace cpp harvestdb
namespace go harvestdb
namespace py harvestdb
namespace php HarvestDB

struct Url {
    1: required i64    timestamp;
    2: required string url;
    3: required string version;

typedef list<Url> UrlList

struct UrlResult {
    1: required i32          prefix;
    2: required i32          found;
    3: required i32          total;
    4: required list<string> urls;

service HarvestDB {
    bool ping(),
    Url get(1:i32 prefix, 2:string url),
    bool put(1:i32 prefix, 2:Url url),
    UrlResult search(1:i32 prefix,
                     2:string includeRegex,
                     3:string excludeRegex,
                     4:i32 size,
                     5:i32 timeout),
    bool purge(1:i32 prefix, 2:i64 timestamp)

Clients use ping to check HarvestDB server connectivity before executing other procedures. RabbitMQ consumers consume collected URLs and put them to HarvestDB. The PHP based application backend uses custom Thrift based client library to get (read) and to search URLs. A Python program runs as a periodic cron job and uses purge procedure to purge old entries based on timestamp which makes sure it won’t exhaust the storage resources. The system is in production for more than five months now and is capable of handling (benchmarked) workload of up to 24k writes/second while consuming less than 500MB RAM. The future work will be on replication, sharding and fault tolerance of this service. The following diagram illustrates this architecture.

Overall architecture

Discussion on Hacker News

April 22, 2014

Moving to a Bigger Disk Online LVM disk migration

This post would describe a painless disk migration strategy when moving your partitions to a larger disk. My Thinkpad uses a 120G SSD which I wanted to clone to a 480G SSD for my desktop so I can migrate my existing setup without having to reinstall Linux, tons of packages on it and deal with their custom configurations. I use LVM on all my system which makes the cloning and migration very simple. This post assumes a simple partitioning scheme where you have at least one primary partition for /boot and another for / (second one could be an extended partition with LVM partitions).

First of all do back up your important data, keys and whatnot. Attach the disks to a computer (desktop in my case). Next, boot to Linux from your source disk, in single user mode or recovery mode, which in my case was the OCZ 120G SSD. Identify the destination partition using fdisk -l.

Alright, let’s copy data bit by bit using dd. For readymade UX I use pv for tracking progress, people use Ctrl+t or signals (such as sig USR) for tracking copied bytes.

    $ dd if=/dev/sda | pv | dd of=/dev/sdb

After this is successful, run sync to force flush disk buffer and reboot to the destination disk which in my case was the 480G SSD.

Next, boot to the destination disk (probably detach the source disk). Do fdisk -l to find various partitions, depending on how you may have partitioned the source disk you may have to adapt to the solution this post describes. In my case there were two partitions, a primary /dev/sda1 for the /boot partition and an extended /dev/sda2 partition which had one main LVM partition /dev/sda5. We now simply need to alter the partition table so the partitions can occupy the free space, then resize the primary volumes and the logical volumes and finally resize the file systems.

Now, we’ll delete the partition table entries and resize the boundaries. Don’t worry doing the following does not really wipe off your data but simply changes partition enteries (but beware what’s you’re going to do):

    $ fdisk /dev/sda # note this is the new disk

    Command (m for help): p

    Disk /dev/sda: 480.1 GB, 480103981056 bytes
    255 heads, 63 sectors/track, 58369 cylinders, total 937703088 sectors
    Units = sectors of 1 * 512 = 512 bytes
    Sector size (logical/physical): 512 bytes / 4096 bytes
    I/O size (minimum/optimal): 4096 bytes / 4096 bytes
    Disk identifier: 0x000ea999

      Device Boot      Start         End      Blocks   Id  System
    /dev/sda1   *        2048      499711      248832   83  Linux
    /dev/sda2          499712   937703087   468601688    5  Extended
    /dev/sda5          501760   937703087   468600664   8e  Linux LVM

    Command (m for help): d
    Partition number (1-5): 2

    Command (m for help): n
    Partition type:
      p   primary (1 primary, 0 extended, 3 free)
      e   extended
    Select (default p): e
    Partition number (1-4, default 2):
    Using default value 2
    First sector (499712-937703087, default 499712):
    Using default value 499712
    Last sector, +sectors or +size{K,M,G} (499712-937703087, default 937703087):
    Using default value 937703087

    Command (m for help): n
    Partition type:
      p   primary (1 primary, 1 extended, 2 free)
      l   logical (numbered from 5)
    Select (default p): l
    Adding logical partition 5
    First sector (501760-937703087, default 501760):
    Using default value 501760
    Last sector, +sectors or +size{K,M,G} (501760-937703087, default 937703087):
    Using default value 937703087

    Command (m for help): t
    Partition number (1-5): 5
    Hex code (type L to list codes): 8e
    Changed system type of partition 5 to 8e (Linux LVM)

    Command (m for help): p

    Disk /dev/sda: 480.1 GB, 480103981056 bytes
    255 heads, 63 sectors/track, 58369 cylinders, total 937703088 sectors
    Units = sectors of 1 * 512 = 512 bytes
    Sector size (logical/physical): 512 bytes / 4096 bytes
    I/O size (minimum/optimal): 4096 bytes / 4096 bytes
    Disk identifier: 0x000ea999

      Device Boot      Start         End      Blocks   Id  System
    /dev/sda1   *        2048      499711      248832   83  Linux
    /dev/sda2          499712   937703087   468601688    5  Extended
    /dev/sda5          501760   937703087   468600664   8e  Linux LVM

    Command (m for help): w

Finally resize the physical volumes and logical volumes after which we’re done:

    $ pvdisplay
    $ pvresize /dev/sda5
    $ lvdisplay
    $ lvresize -l+100%FREE /dev/volume-group-name/root
    $ resize2fs /dev/volume-group-name/root
    $ lvdisplay # verify LVM partition size
    $ df -h # verify partition size

January 30, 2014

CTF3 Experience Capture The Flag III

Unlike last CTFs by Stripe, this year’s CTF was announced to be around distributed systems. This sounded fun so I participated in CTF3. I was able to get to the final level all by myself which was fun, in this post I’ll try to share my experience. I’ve pushed my code on github.

<p align="center"></p>

Level0 was fizzbuzz problem which was fun to implement. The stub implementation was in Ruby but to clear this level I wrote a solution in C++. The problem was to highlight or add markups <> to all words of a given file which are in a provided dictionary file. The optimal solution would consume O(n) which many people implemented.

Level1 was interesting problem around digital currencies such as bitcoins. Because of this problem I finally understood various concepts such as distributed ledger, proof of work, block chains etc. The problem was to create a commit by giving yourself one gitcoin (in a LEDGER.txt file) and committing it in such a way that the commit SHA would be less than provided difficulty. The logic was to compute SHA1 of a static string with varying nonce (or counters) which concluded the proof of work. I was able to pull it off in a Python based miner and again by a more efficient and fast Go based miner. People use CUDA and GPU based implementations which helped them have an edge in the round-matches.

Level2 was interesting problem around DDOS where we were to stop an ongoing DDOS attack by rate limiting API calls and letting legitimate API calls pass on to backend server. Using rate limiting algorithm this was fairly simple to implement and qualify. While I would love to know how the winners of this level implemented their solution, I did not invest a lot of time on it. It was fun to write javascript on the server side for me since it’s actually the first time I’ve ever written javascript that did something :) I also like the testing framework which was used to simulate the network and do the scoring.

Level3 was serious problem of distributed information retrieval which we were supposed to implement in Scala. Scala itself was difficult to work with, nonetheless my naive implementation used the algorithm to shard the file walk travesal and read files and create reverse indexes of words on a give file’s line number. Using this multimap and map of filenames, for a given query I was able to reply which files by line number contained a word that matched given query. While I was able to qualify this level, I invested a lot of time (the entire weekend) on trying to come up with a competant solution (which I could n’t). The JVM gave me more problems than my own algorithms. I would probably invest sometime in understand some theory and learn from other people’s solutions.

Level4 was most challenging. The problem was to implement a distributed highly available mysql server. While sqlite was used as db data store, like a lot of folks I used goraft for consensus since I found some hints and the Go stub code looked a lot like raftd which is an example implementation for using raft library. I really liked the Octopus simulator which could simulate flaky network and the use of unix sockets. This was a nice hack, whoever architected this problem must be really awesome, kudos! I spent the whole night to come up with an optimized solution but I failed to do it in time. So, I’ll explore competant solutions as soon as champions start posting their solutions. And perhaps study Raft and Paxos papers.

Overall my experience was good, I liked the competition and the problems. It was also an opportunity to learn new things and identify scopes of improvement in one’s skills.

Thanks to the fine folks at Stripe, gdb et al for this awesome CTF. Hopefully in the next CTF their server infrastructure would be more scalable to handle loads and the problem would be much clearer with better READMEs and state any data/filepath assumptions (like data and listening paths in level4).

September 2, 2013

Scaling with Queues distributed systems at Wingify

Our home-grown geo-distributed architecture based CDN allows us to delivery dynamic javascript content with minimum latencies possible. Using the same architecture we do data acquisition as well. Over the years we’ve done a lot of changes to our backend, this post talks about some scaling and reliability aspects and our recent work on making fast and reliable data acquisition system using message queues which is in production for about three months now. I’ll start by giving some background on our previous architecture.

Web beacons are widely used to do data acquisition, the idea is to have a webpage send us data using an HTTP request and the server sends some valid object. There are many ways to do this. To keep the size of the returned object small, for every HTTP request we return a tiny 1x1 pixel gif image and our geo-distributed architecture along with our managed Anycast DNS service helps us to do this with very low latencies, we aim for less than 40ms. When an HTTP request hits one of our data acquisition servers, OpenResty handles it and our Lua based code processes the request in the same process thread. OpenResty is a nginx mod which among many things bundles luajit that allows us to write URL handlers in Lua and the code runs within the web server. Our Lua code does some quick checks, transformations and writes the data to a Redis server which is used as fast in-memory data sink. The data stored in Redis is later moved, processed and stored in our database servers.

Previous Architecture

This was the architecture when I had joined Wingify couple of months ago. Things were going smooth but the problem was we were not quite sure about data accuracy and scalability. We used Redis as a fast in-memory data storage sink, which our custom written PHP based queue infrastructure would read from, our backend would process it and write to our database servers. The PHP code was not scalable and after about a week of hacking, exploring options we found few bottlenecks and decided to re-do the backend queue infrastructure.

We explored many options and decided to use RabbitMQ. We wrote a few proof-of-concept backend programs in Go, Python and PHP and did a lot of testing, benchmarking and real-world load testing.

Ankit, Sparsh and I discussed how we should move forward and we finally decided to explore two models in which we would replace the home-grown PHP queue system with RabbitMQ. In the first model, we wrote directly to RabbitMQ from the Lua code. In the second model, we wrote a transport agent which moved data from Redis to RabbitMQ. And we wrote RabbitMQ consumers in both cases.

There was no Lua-resty library for RabbitMQ, so I wrote one using cosocket APIs which could publish messages to a RabbitMQ broker over STOMP protocol. The library lua-resty-rabbitmqstomp was opensourced for the hacker community.

Later, I rewrote our Lua handler code using this library and ran a loader.io load test. It failed this model due to very low throughtput, we performed a load test on a small 1G DigitalOcean instance for both models. For us, the STOMP protocol and slow RabbitMQ STOMP adapter were performance bottlenecks. RabbitMQ was not as fast as Redis, so we decided to keep it and work on the second model. For our requirements, we wrote a proof-of-concept Redis to RabbitMQ transport agent called agentredrabbit to leverage Redis as a fast in-memory storage sink and use RabbitMQ as a reliable broker. The POC worked well in terms of performance, throughput, scalability and failover. In next few weeks we were able to write a production level queue based pipeline for our data acquisition system.

For about a month, we ran the new pipeline in production against the existing one, to A/B test our backend :) To do that we modified our Lua code to write to two different Redis lists, the original list was consumed by the existing pipeline, the other was consumed by the new RabbitMQ based pipeline. The consumer would process and write data to a new database. This allowed us to compare realtime data from the two pipelines. During this period we tweaked our implementation a lot, rewrote the producers and consumers thrice and had two major phases of refactoring.

A/B testing of existing and new architecture

Based on results against a 1G DigitalOcean instance like for the first model and against the A/B comparison of existing pipeline in realtime, we migrated to the new pipeline based on RabbitMQ. Other issues of HA, redundancy and failover were addressed in this migration as well. The new architecture ensures no single point of failure and has mechanisms to recover from failure and fault.

Queue (RabbitMQ) based architecture in production

We’ve opensourced agentredrabbit which can be used as a general purpose fast and reliable transport agent for moving data in chunks from Redis lists to RabbitMQ with some assumptions and queue name conventions. The flow diagram below has hints on how it works, checkout the README for details.

Flow diagram of "agentredrabbit"

Discussion on Hacker News

June 23, 2013

Hello Again Arch Installing Arch Linux

At this point of time I’m tired of using OSX and I’ve this feeling that I’m losing my grip on my system. I no longer know why some processes are running and for what, and if I should allow those processes to run.

For quite some time I was thinking to migrate back to some Linux distro; Ubuntu was out of question, Debian release processes are too slow for me, Fedora/CentOS are not much fun either. So, I decided to checkout Arch Linux one more time because it’s gaining a lot of traction lately and few of my hardcore friends have already taken refuge under Arch and one of i3/xmonad/awesome.

From my last memories, I liked its minimalistic approach and the fact that no process will run on it unless I wanted. Getting the installation media was easy. Esoteric and technical, but the installation process is quite simple:

  • Disk partioning and formatting
  • Mount partitions, generate filesystem table
  • Installing Arch Linux base system
  • Chroot to configure locale, time, install grub etc. and reboot and done!

Before wiping my MBA and install Arch I’ll be evaluating Arch for next few weeks in a VBox VM. To keep it simple, I’ll have only two partitions; one for / and ones for swap.

# Partition disks

# Format partitions
mkfs -t ext4 /dev/sda1
mkswap /dev/sda2

# Mount partitions

mount /dev/sda1 /mnt
swapon /dev/sda2

# Fix mirror list
vi /etc/pacman.d/mirrorlist

# Install base system
pacstrap /mnt base base-devel

# Generate filesystem table
genfstab /mnt >> /etc/fstab

# Chroot, config system
arch-chroot /mnt

# Change root password

# Select en/US locale
vi /etc/locale.gen

# Configure timezone
ln -s /usr/share/zoneinfo/Asia/Kolkata /etc/localtime

# Set hostname
echo myawesomehostname > /etc/hostname

# Install bootloader
pacman -S grub-bios

grub-install /dev/sda
mkinitcpio -p grub
grub-mkconfig -o /boot/grub/grub.cfg

# Exit out of chrooted env

# Cleanup reboot
umount /mnt
swapoff /dev/sda2

After rebooting to the installed system, I enabled bunch of services like dhcpcd so it autoconfigures network/ip for me, edit pacman conf file, update/upgrade using pacman Arch’s package manager, configure sound, x-server, bunch of kernel modules, and install i3 because all the good desktop environments are so messed up and I like tiling window managers.

# Configure network, edit stuff...
systemctl enable dhcpcd

# Add user
visudo  # Allow %wheel
useradd -m -g users -G storage,power,wheel -s /bin/bash bhaisaab

# Pacman
vi /etc/pacman.conf
# Update
pacman -Syy
# Upgrade
pacman -Su

# Sound
pacman -S alsa-utils
alsamixer --unmute
speaker-test -c2

# X
pacman -S xorg-server xorg-server-utils xorg-xinit

# VirtualBox drivers
pacman -S virtualbox-guest-utils
modprobe -a vboxguest vboxsf vboxvideo
vi /etc/modules-load.d/virtualbox.config

# X twm, clock, term
pacman -S xorg-twm xorg-clock xterm

# i3
pacman -S i3
echo "exec i3" >> ~/.xinitrc

# Startx

This is what I got for now ;)

If you’re a long time Arch/i3 user share with me your experiences so far and your i3 config file, it’s always good to fork someone’s dotfiles than write from scratch :)

© 2009-2014 | Report bug or fork source | Last updated on 14 Jul 2014
Ohloh profile for Rohit Yadav hacker emblem