Preview of Storm: The Hadoop of Realtime Processing

We have been doing realtime processing for a long time at BackType. We've recently developed a new system for doing realtime processing called Storm to replace our old system of queues and workers. Storm is a distributed, reliable, and fault-tolerant stream processing system. Its use cases are so broad that we consider it to be a fundamental new primitive for data processing. That's why we call it the Hadoop of realtime: it does for realtime processing what Hadoop does for batch processing. We are planning to open-source Storm sometime in the next few months.

Storm

Like most people doing realtime processing, our old system was a complicated graph of queues and workers. Worker processes would take messages off a queue and then update a database and/or fire off new messages to other queues. 

There was a lot of pain in doing realtime processing this way. We found that we spent most of our time worrying about where to send messages, where to receive messages from, deploying workers, and deploying queues. Worst of all, the system wasn't fault tolerant: we had to make sure our queues and workers stayed up.

Storm solves these issues completely. It abstracts the message passing away, automatically parallelizes the stream computation on a cluster of machines, and lets you focus on your realtime processing logic. Even more interesting, Storm enables a whole new range of applications we didn't anticipate when we initially designed it.

Properties of Storm

Here are the key properties of Storm:

1. Simple programming model: Just like how MapReduce dramatically lowers the complexity for doing parallel batch processing, Storm's programming model dramatically lowers the complexity for doing realtime processing. 

2. Runs any programming language: Even though Storm runs on the JVM (and is written in Clojure), you can use any programming language on top of Storm. We've added support for Ruby and Python, and support can easily be added for any language -- all you need to do is code a ~100 line library which implements a simple communication protocol with Storm.

3. Fault-tolerant: To launch a processing topology on Storm, all you have to do is provide a jar containing all your code. Storm then distributes that jar, assigns workers across the cluster to execute the topology, monitors the topology, and automatically reassigns workers that go down.

4. Horizontally scalable: All computations are done in parallel. To scale a realtime computation, all you have to do is add more machines and Storm takes care of the rest.

5. Reliable: Storm guarantees that each message will be fully processed at least once. Messages will be processed exactly once as long as there are no errors.

6. Fast: Storm is built with speed in mind. ZeroMQ is used for the underlying message passing, and care has been taken so that messages are processed extremely quickly.

Use cases for Storm

There are three broad use cases for Storm:

1. Stream processing: This is the traditional realtime processing use case: process messages and update a variety of databases.

2. Continuous computation: Storm can be used to do a continuous computation and stream out the results as they're computed. For example, we used Storm the other day to compute trending users on Twitter off of the Twitter firehose. Every second, Storm streams out the 50 users with the most retweets in the last few minutes with perfect accuracy. We stream this information directly into a webpage which visualizes and animates the trending users in realtime.

3. Distributed RPC: Distributed RPC is perhaps the most unexpected and most compelling use case for Storm. There are a lot of queries that are both hard to precompute and too intense to compute on the fly on a single machine. Traditionally, you have to do an approximation of some sort to lower the cost of a query like this. Storm gives the capability to parallelize an intense query so that you can compute it in realtime.

An example of a query that is only possible with distributed RPC is "reach": computing the number of unique people exposed to a URL on Twitter. To compute reach, you need to get all the people who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the number of uniques. It's an intense computation that potentially involves thousands of database calls and tens of millions of follower records. It can take minutes or worse to compute on a single machine. With Storm, you can do every step of the reach computation in parallel and compute reach for any URL in seconds (and less than a second for most URLs).

The idea behind distributed RPC is that you run a processing topology on Storm that implements the RPC function and waits for RPC invocations. An RPC invocation is a message containing the parameters of the RPC request and information of where Storm should send the results. The topology picks up messages, computes the RPC call in parallel, and returns the results to the return address.

Summary

Storm is already enormously useful for us at BackType. It reduces a ton of complexity in our realtime processing and lets us do things we couldn't do before. We look forward to the day we open source it.

You should follow the BackType tech team on Twitter here.

Why Yieldbot chose Cascalog over Pig for Hadoop processing

This is a guest post by Soren Macbeth, Chief Data Hacker at Yieldbot. Yieldbot captures and organizes the realtime intent existing in web publishers and makes it available to advertisers so they can match offers and ads at the exact moment consumers are most open to receiving relevant marketing. Previously, Soren was co-founder of StockTwits, an open, community-powered idea and information service for investors. Soren is active on Twitter at @sorenmacbeth.

At Yieldbot, we do a ton of batch processing of analytics data on Hadoop. As a small startup, speed is of the utmost importance, especially when it comes to iteration of our data processing codebase. Due to our speed requirement, we initially selected Pig to write our mapreduce jobs in. After a few months of getting our hands dirty with Pig, we decided to make the switch over to Cascalog. We have been extremely happy with that decision. 

Why we initially selected Pig

The fact that Pig uses a custom scripting language, called Pig Latin, makes it a very attactive choice for rapid development. The built-in shell, called Grunt, provides for interactive development and debugging which is hugely useful for iterative development of algorithms. Another important factor was that Pig was included with Cloudera's Hadoop distribution, which we use. Finally, Pig is in use in production at many of the leading companies such as LinkedIn and Twitter.

Problems with Pig

As we began implementing our algorithms in Pig, we encountered a variety of issues which weren't immediately obvious. Most of them impacted the speed of development. The primary issues were centered around Pig Latin. Designing a programming language from scratch is a very difficult task. Pig Latin does many things very well, especially different type of joins, filtering, and grouping. However, once you move beyond the basics that Pig Latin covers, you find yourself writing Java code. A lot of Java code. Loading custom data, storing custom data, functions for transforming data, filtering data all involve writing code in Java, packaging it up as a jar and loading it up in your Pig Latin scripts. This jumping back and forth between editing, compiling, and packaging your Java code and running your Pig scripts leads to long development cycles. Debugging becomes especially challenging as bugs can occur in Java land and/or in Pig Latin land. Testing also suffers from this split of Java and Pig Latin. Java certainly has many different unit testing packages, but until the most recent version of Pig, no such facility existed.

Enter Cascalog

Faced with these issues and after a brief period of experimentation, we decided to make the leap and migrate all of our data processing over to Cascalog. There was a bit of a steep learning curve as none of us were very familiar with Clojure. However because Clojure is a very interesting and general purpose language we felt comfortable investing time to learn it. Now with several months of using Cascalog and Clojure in production, we could not be happier with our decision. 

Speed

What Cascalog gives us is the ability to implement and iterate our data processing task with extreme speed. True iterative development is possible via the magic of the Clojure REPL and its java interop magic. Rather then having to constantly jump back and forth between Pig Latin land and Java land, everything can be accessed, written, run, and tested all without leaving the REPL. Because Cascalog code is just Clojure code, you immediately gain all the benefits of Clojure when doing your data processing. Another benefit is the easy deployment to a production cluster. Clojure tools such as Leiningen make it easy to create an 'uberjar' with everything, including Clojure bundled in making running on your hadoop cluster a snap. A final benefit is that writing unit tests becomes a simple matter. Cascalog and Clojure come with great builtin support for writing unit tests.

Conclusion

If you are new to Clojure as we were, the learning curve to start using Cascalog is definitely steep. However, the investment in time learning Clojure and Cascalog has paid many dividends for us at Yieldbot. As a very small team, the speed at which we can write and iterate on your code is of the utmost importance. I know the folks at BackType share this sentiment and one needs to look no further then Cascalog for evidence. The bottom line is that Cascalog allows us to run faster then Pig did. Lastly, we'd like to thank the entire BackType team for releasing Cascalog into the wild.

The dark side of Hadoop

At BackType, we are heavy users of Hadoop. We use it to run computations on our 30TB datastore of social data. We've even open-sourced some significant projects that are built on top of Hadoop.

Unfortunately, Hadoop has problems. It's sloppily implemented and requires all sorts of arcane knowledge to operate it. We would be the first to try out a replacement for Hadoop if a viable alternative existed. In this post, we'll look at some of the darker aspects of Hadoop.

Critical configuration poorly documented

There's a configuration property for Hadoop called "dfs.datanode.max.xcievers". By default, it's set to 256. It turns out if you don't raise this value to be significantly higher (e.g., 5000), your cluster blows up with all sorts of weird errors like EOFExceptions.

As you can imagine, tracking this one down is quite an adventure. Even though this property is so critical, it's not documented anywhere. And on top of that, the property isn't even spelled correctly. It's supposed to be spelled "xceiver" (which is how it's spelled in the source code). See this post for more details.

Terrible with memory usage

We used to have problems with Hadoop running out of memory in various contexts. Sometimes our tasks would randomly die with out of memory exceptions, and sometimes reducers would error during the shuffle phase with "Cannot allocate memory" errors. These errors didn't make a lot of sense to us, since the memory we allocated for the TaskTrackers, DataNodes, and tasks was well under the total memory for each machine.

We traced the problem to a sloppy implementation detail of Hadoop. It turns out that Hadoop sometimes shells out to do various things with the local filesystem. When you shell out in Java, the process gets forked. Forking a process causes the child process to reserve the same amount of memory for itself as the parent process is using (to fully understand what's happening, you need to learn about memory overcommit and the copy-on-write semantics of forking in Linux).This means that the Hadoop process which was using 1GB will temporarily "use" 2GB when it shells out.

The solution to these memory problems is to allocate a healthy amount of swap space for each machine to protect you from these memory glitches. We couldn't believe how much more stable everything became when we added swap space to our worker machines.

Zombies

Hadoop is terrible at process management. It sometimes fails to kill processes that it launches, leaving "zombie tasks" throughout the cluster. These zombie tasks soak up memory, causing out of memory errors for real tasks. We've seen clusters get completely overriden by zombies and become useless.

The problem is that Hadoop puts the burden of a task exiting on the task itself, rather than the TaskTracker that launched it. So if a task enters a zombie state for one reason or another, there's nothing that will clean it up. This is a terrible design. The burden should be on the TaskTracker to supervise the processes it launches and kill them when necessary. Hadoop should not trust user code in tasks to behave properly. This ensures that if I mess up my code, my job may have problems but at least I won't damage the cluster.

A task turning into a zombie seems to be related to encountering an out of memory error, as we've noticed much fewer zombies since fixing Hadoop's other memory issues by adding swap space. We just wish that Hadoop was designed correctly so that zombies were not even possible.

Conclusion

Hadoop continuously finds new and creative ways to frustrate us. Making Hadoop easy to deploy, use, and operate should be the #1 priority for the developers of Hadoop. Hadoop could be an amazing project. Right now though, it's just plain sloppy.

Follow the BackType tech team on Twitter here.

 

Introducing ElephantDB: a distributed database specialized in exporting data from Hadoop

ElephantDB is a database that specializes in exporting key/value data from Hadoop. We have been running it in production at BackType for over half a year now and are excited to be open-sourcing it. In this post, I'll introduce ElephantDB, show how to use it, and then compare it to other databases out there. ElephantDB is hosted on GitHub here.

Unlike most other databases, ElephantDB dissassociates the creation of a database index from the serving of that index. ElephantDB is comprised of two components. The first is a library that is used in a MapReduce job to create an indexed key/value dataset that is stored on a distributed filesystem. The second component, ElephantDB server, is a daemon that downloads a subset of a dataset and serves it in a read-only, random-access fashion. A group of ElephantDB servers working together to serve a full dataset is called a ring. Both the creation and serving of a dataset are done in a fully distributed fashion.

99.9% of the complexity of distributed databases comes from supporting random writes. Since ElephantDB doesn't support random writes, it is extremely simple -- only a few thousand lines of code. Once a server loads up its subset of data, it does very little. This leads to ElephantDB being rock-solid in production, since there's almost no moving parts.

Why did we create ElephantDB?

We rely heavily on batch computation via Hadoop at BackType. We have over 25TB of social data (tweets, blog comments, etc.) stored on a distributed filesystem, and we use Hadoop to compute views of that data that we serve in our product. For example, one view is influence scores for everyone on Twitter. Another view is tweet counts for each url. We needed a database that could serve these various views to our product.

We looked into using Voldemort, but it lacked the incremental capabilities we wanted. We then realized that a database that doesn't support random writes is really simple and would be easy to write ourselves. We also thought that a database focused on only handling the batch exporting workflow could be made extremely simple to configure and operate. ElephantDB only took a few weeks to create and get into production.

High level overview

Before we dive into ElephantDB, let's get some terminology out of the way. An ElephantDB "domain" is analagous to a "table" from relational databases -- a single, related set of data. An ElephantDB domain is comprised of a fixed number of "shards" of a "Local Persistence". A "shard" is a subset of a domain, and a "Local Persistence" is a regular key/value database that reads and writes to disk. ElephantDB comes bundled with a "Local Persistence" implementation for Berkeley DB Java Edition. An ElephantDB "ring" is a group of servers that work together to serve a set of ElephantDB domains. Each server serves a subset of the data, and each server knows what data the other servers have. 

The following picture illustrates how an ElephantDB domain is created. Key/value pairs are sharded and indexed in a MapReduce job and stored on a distributed filesystem:

Overview1
To serve the data, you point an ElephantDB ring where the domain is stored on the distributed filesystem. Each ElephantDB server will then download a subset of the shards and serve them, as illustrated in the following picture:

Overview2

As you can see, the distributed filesystem is used as a distribution point for an ElephantDB domain.

Creating a domain of data via MapReduce

Let's look at how ElephantDB works in more detail. Let's start with how ElephantDB creates or updates an ElephantDB domain.

A domain is created or updated via a MapReduce job and stored on the distributed filesystem. Here's what a domain looks like on the distributed filesytem:

Edb-domain

There can be multiple versions of a domain. Any given version of a domain is immutable, and everytime a domain is updated, a new version of the domain is created. In this picture, each numeric directory is a different version of the domain. A domain also contains a metadata file, domain-spec.yaml, that indicates what Local Persistence this domain is comprised of and how many shards are in this domain. The contents of the domain-spec.yaml for this domain is:

If we dive into one of these versions, we see the following directory structure. Each of these folders is a shard for this version of the domain:

Edb-domain-version

Diving into one of these shards, we can see the Berkeley DB files for the shard:

Edb-shard

When given a set of key/value pairs to turn into an ElephantDB domain, ElephantDB performs the following steps in a MapReduce job to create a version for the domain:

  1. Select a shard for each key using consistent hashing (take the hash of the key and mod it by the number of shards)
  2. Group all the key/value pairs by their shard number
  3. In the reduce task, open a local persistence locally and stream all the key/value pairs for the shard into it.
  4. At end of reduce task, copy the files for the local persistence to the distributed filesystem.

At no point in the creation of a domain is an ElephantDB server involved. The creation of a domain happens independently from serving the domain.

The code to create an ElephantDB domain is really simple. Here's an example using Cascading:

First, we create an ElephantDBTap and configure it with how many shards we want and what LocalPersistence to use. At the end of our Cascading flow, we pipe the data through an ElephantTailAssembly which shards and groups the data to prepare it for the output tap.

ElephantDB is even easier to use with Cascalog:

"elephant-tap" from elephantdb-cascalog abstracts away the sharding/grouping process completely so that you can write key/value pairs to it just like you would write them to a regular file-based tap.  

Serving ElephantDB domains with ElephantDB server

Now let's take a look at how ElephantDB server works. A group of ElephantDB servers work together to serve a set of domains. Each ElephantDB server will serve a subset of each domain. The first thing you have to do is configure what domains the ring should be serving. The configuration is stored on the distributed filesystem and looks something like this:

This configuration is as simple as it gets. You just have to list all the hosts in the ring, the replication factor, what domains to serve, and what port each server should use.

ElephantDB server decides what shards each host should serve by running a deterministic algorithm that assigns hosts to shards. Since every ElephantDB server gets the same input, they all make the same conclusion and know where all the data is located across the ring.

Each ElephantDB server also has a simple local configuration containing where to cache shards locally as well as machine-specific configuration for the local persistences. See the example conf in the project for an example of this.

ElephantDB server has a Thrift interface, so you can access it from any language. The interface contains methods for doing gets, multi-gets, and getting the status as to whether domains have been loaded from the distributed filesystem yet. Here's an example of doing a get to ElephantDB from Clojure:

When you do a get to an ElephantDB server, it will route your request to the server that has the data for that key. This happens transparently.

When ElephantDB starts up, it asynchronously downloads shards for each domain from the distributed filesystem locally. You can use the status methods on its Thrift interface to know when it's fully loaded.

Currently, updating an ElephantDB server with new data requires taking downtime on the server. We are working on adding hot-swapping so that the server does this automatically in the background without taking downtime. 

Additional Features

Incremental updates

ElephantDB supports doing incremental updates of a domain. The steps ElephantDB performs to do an incremental update are just a little bit different than the steps ElephantDB uses to create a brand new domain:

  1. Select a shard for each key using consistent hashing (just like before)
  2. Group all the key/value pairs by their shard number (just like before)
  3. Download the most recent version of the shard from the distributed filesystem
  4. Execute updating logic for each new key/value pair
  5. Upload the updated shard to the distributed filesystem as a new version

The updating logic is completely pluggable. By default, it just does a replace of whatever key/value pair was already in the local persistence. At BackType, we do all sorts of custom logic in the updater, from incrementing counts to merges of the old value with the new value.

All the incrementalization happens on the MapReduce side. Incrementally updating shards in this manner is a dramatic improvement over re-indexing the entire domain from scratch. 

Note that ElephantDB server updates itself by downloading all the new shards from scratch -- it's not able to download only the "new stuff". This typically isn't so bad as copying is quite fast, but it's certainly an area for improvement.

ElephantDB as an input source

Those ElephantDB domains sitting on the distributed filesystem can be used as an input source to your jobs. This is really useful. This lets you do analytics on your data without touching the servers that are serving the data to your application.

Furthermore, with this feature ElephantDB is useful even without ElephantDB server. ElephantDB can be used as an indexed key/value file format on top of Hadoop. There are lots and lots of applications for this.

Here's an example of using ElephantDB as an input source with Cascading. As you can see, the same tap that's used to create ElephantDB domains can be used as a source. The tap will emit key/value pairs into the processing flow.

Comparison to other database technologies

There are a lot of distributed databases out there. Most of these databases, like Cassandra and Riak, aren't comparable as they are read/write databases that create the index in the same place they serve the index. The most comparable database is Voldemort, which has a read-only mode with similarities to ElephantDB (Voldemort can also be used as a Dynamo-style read/write distributed database). Here's a tutorial from LinkedIn on how to use Voldemort's exporting features. Below is a comparison of Voldemort's read-only mode to ElephantDB. I'm sure I'm missing important aspects of Voldemort below. If so, let me know and I'll update the comparison.

Voldemort advantages:

  1. Better performance: Voldemort has had a lot more optimization work done to it than ElephantDB, so it will have better performance. This is an area where ElephantDB may "borrow" from Voldemort (in particular, we'd like to try out Krati as an alternative to Berkeley DB JE).
  2. Supports hot swapping: Voldemort databases can do live swaps of new versions of a domain in the background. This is functionality we're currently adding to ElephantDB so this difference will not last long.

ElephantDB advantages:

  1. Simpler to configure: Since ElephantDB is a more specialized database, it benefits from being easier to configure. Whereas with Voldemort you need to manually assign hosts to partitions, ElephantDB just figures out the hosts to shard mapping on its own.
  2. Support for incremental updates to a domain.
  3. Support for reading ElephantDB domains stored on the distributed filesystem from MapReduce.
  4. Very small codebase: ElephantDB is only a few thousand lines of code. This makes it easy to understand and extend.

The future

ElephantDB has been a very successful project at BackType. Future improvements we plan or would like to make to ElephantDB include the following:

  1. Performance optimization: In particular, we'd like to try out Krati as an alternative to Berkeley DB JE.
  2. Hot swapping: Currently, to update a domain without taking downtime, you'd need to do a rolling update of multiple ElephantDB rings. To simplify this workflow, we'd instead like for a live ElephantDB ring to download new shards and swap them in the background.
  3. Richer data model: We'd like to explore adding a richer data model to ElephantDB beyond key/value.

Otherwise, we want to keep ElephantDB simple. We believe that a simple database is more reliable and makes life easier for everyone.

Conclusion

We spend almost no time on operational or maintenance issues with ElephantDB at BackType. It just works. As a small startup, it is important for us to eliminate complexity from our systems so that we can focus on customer problems. ElephantDB removes all the friction from serving batch-computed views of our 25TB dataset into our application. Its incremental capabilities let us keep those views updated with minimal cost, and it wears multiple hats by letting us use an ElephantDB domain as an input source to our jobs.

If you're interested in using ElephantDB, be sure to join the user group and follow the project on GitHub.

You should follow the BackType tech team on Twitter here.

 

Secrets of BackType's Data Engineers

We've been very busy lately at BackType. We moved into a new office, welcomed two new members to our team and have been furiously iterating on our products and technology (occasionally stopping to enjoy some beers and emerging programming languages). On the tech side, we're working hard on a new stream processing framework to help our customers make sense of social media in real-time. Right now we process more than 100 million messages a day, both in batch and real-time, but our stream processing is somewhat hand-coded. Nathan has been leading the charge on our new framework (code-named Thunderlog) that will generalize the real-time work-flow much like how Hadoop does for bactch work-flows.

A few weeks ago we sat down with ReadWriteWeb's Pete Warden to talk big data, stream processing and BackType's data pipeline. We had a blast geeking out and now he has published a very flattering article about our technology on RWW. Thanks Pete! Check out the article; if what we're up to sounds cool get in touch, we're looking for badass product and data hackers (http://www.backtype.com/jobs).

Welcome Jason & Christopher!

One of the best things about startups is that you get to choose who you work with. We have a very rigorous recruiting process because we're obsessed with finding top talent to join the team. Today, we'd like to welcome two new members of the BackType team as interns. They will be working with us for the next four months and contributing posts to the blog.

Christopher Bertels is studying computer science and philosophy at the University of Osnabrück (Germany). His main interests lie in the advancement of Free Software, programming language implementations and virtual machine technology. He is the creator of Fancy, a dynamic, pure object oriented programming language inspired by Smalltalk, Ruby and Erlang. He presented it at the O'Reilly Open Source Convention last year. After seeing us on Hacker News, Christopher sent us an e-mail at 4am his time; little did he know we would respond with a coding challenge ten minutes later. He passed the challenge and rest of our interview process with flying colors.

We met Jason Jackson while interviewing candidates at the University of Waterloo; of over 100 applicants, he was the strongest and most driven to join a startup. After his flight arrived on Monday morning, he came directly to the office with luggage in hand to start working. He studies software engineering at UW, and has previous work experience at Google as well as another start-up in the Bay Area. His main interests include entrepreneurship, innovation, aerospace technology, argentine tango and psychology.

Co-ops and interns do the same thing everyone else on the team does: build incredible products and technology that solve high priority problems for our customers. Click here if you'd like to learn more about becoming an intern at BackType. We're hiring for full-time positions as well.

Patching the Cloudera EC2 Boot Scripts for Spot Instances

Cloudera’s Distribution for Hadoop includes a very useful set of scripts for launching Hadoop clusters on EC2. They take a lot of the pain out of getting a cluster up and running. Unfortunately, they do not support spot instances, which allow you to launch a cluster at significantly lower prices (typically 1/3 the on demand rates).

Below is a simple patch that adds spot instance support to those scripts. After applying you just need to add a spot_price to your clusters.cfg file and the scripts will attempt to launch instances using the spot API. The patch can be improved by allowing users to launch the masters as regular instances, but we have not run into issues so far.

 

Custom Cassandra Comparators Using Protocol Buffers

At BackType we serve a lot of our analytics data from Cassandra. We love the write performance, flexible schema, horizontal scalability and range queries that it allows.

The out of the box ordering implementations (ASCII, UTF-8, Long, and UUID) get you a long way, but to do a complex column type, such as one made up of a timestamp and an auto-increment ID, you're on your own.

The bad way

The hacky solution to create a custom column would be to pack two ints into a LongType:

struct.pack('!II', int1, int2)

This is a really bad idea. We're just smashing the ints together and there's nothing to protect us against careless mistakes. Also, by packing two ints into the LongType we've achieved the custom sorting we sought but in an inflexible and non obvious way.

The good way

A proper solution should do a few things:

  1. Work for structured data and throw an error when we make a mistake
  2. Give us direct control over how the sorting is implemented
  3. Be language and platform neutral

We decided to go with Google Protocol Buffers:

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages – Java, C++, or Python.

That they are small, fast, simple and can be used in Python and Java make them perfect for reading and writing our custom columns. We simply define a "message" that represents our structure and then serialize and deserialize it when generating and parsing column names.

Here's an example .proto file for our new column type:

message MyCustomColumn {
  required uint64 timestamp = 1;
  required uint64 some_id = 2;
}

Next we generate the Python and Java libraries and write a quick custom comparator for Cassandra to use when inserting into our column family:

package backtype.cassandra;

import backtype.cassandraext.Cassandraext;
import org.apache.cassandra.db.marshal.AbstractType;
import com.google.protobuf.InvalidProtocolBufferException;

public class MyCustomComparator extends AbstractType {
    @Override
    public String getString(byte[] bytes) {
        if (bytes.length == 0) return "";
        try {
            Cassandraext.MyCustomColumn c = Cassandraext.MyCustomColumn.parseFrom(bytes);
            return c.toString();
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }
    
    public int compare(byte[] o1, byte[] o2) {
        if (o1.length == 0) {
            return o2.length == 0 ? 0 : -1;
        }
        if (o2.length == 0) {
            return 1;
        }
        try {
            Cassandraext.MyCustomColumn c1 = Cassandraext.MyCustomColumn.parseFrom(o1);
            Cassandraext.MyCustomColumn c2 = Cassandraext.MyCustomColumn.parseFrom(o2);
            if (c1.getTimestamp() < c2.getTimestamp()) {
                return -1;
            } else if (c1.getTimestamp() > c2.getTimestamp()) {
                return 1;
            } else {
                if (c1.getSomeId() < c2.getSomeId()) {
                    return -1;
                } else if (c1.getSomeId() > c2.getSomeId()) {
                    return 1;
                } else {
                    return 0;
                }
            }
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
            throw new RuntimeException("Could not parse protobuf object", e);
        }
    }
}

Finally we add our new column to the schema and restart our nodes with the new jars for our comparator and protobuf in cluded in the lib directory:

<ColumnFamily Name="MyColumn" CompareWith="backtype.cassandra.MyColumnComparator"/>

What about Thrift?

We've used Thrift to do this before — it works in the exact same way. Unfortunately, Cassandra is built against an older version of Thrift and if you want things to work you have to make sure you're using the exact same one.

Top 50 GitHub Projects on Twitter

More and more these days, I'm finding that my Twitter friends are exposing me to new and exciting open-source projects on GitHub. This weekend I put my curiousity to rest and looked into the sharing of GitHub projects on Twitter. I started off by plugging github.com into BackType. No surprise: GitHub is linked and discussed a bunch on Twitter, FriendFeed, Reddit and Hacker News.

Picture_2

Next I ran a query on our data-set to find the 50 projects that have been shared the most on Twitter this year [1]:

1. Gordon — An open source Flash™ runtime written in pure JavaScript (1140 users)
2. Murder — Large scale server deploys using BitTorrent and BitTornado (352 users)
3. three.js — Javascript 3D Engine (313 users)
4. Homebrew — The missing package manager for OS X (305 users)
5. FlockDB — A distributed, fault-tolerant graph database. (289 users)
6. HipHop for PHP — Transforms PHP into C++ and compiles it using g++. (259 users)
7. brightkitey — A little Ruby wrapper around Brightkite's API (235 users) [2]
8. helium-css —  javascript tool to scan your site and show unused CSS (228 users)
9. Gizzard — A framework for creating distributed datastores. (201 users)
10. Twurl — OAuth-enabled curl for the Twitter API (192 users)

[1] I queried for the most unique users, not the projects that were tweeted the most times (often people tweet their own content repeatedly).
[2] brightkitey benefited from a bunch of bit.ly URLs being trimmed in tweets to "http://bit.ly/bk", see http://www.backtype.com/page/github.com%2Fholman%2Fbrightkitey/conversations?...

Here's the complete Top 50 GitHub Projects on Twitter. A pretty interesting list, including seven projects from Twitter (Murder, FlockDB, Gizzard, Twurl, twitter-text, Querolous, and Elephant Bird), two from Facebook (HipHop and Three20), and pair of cool Cassandra based projects (Twissandra and Lucandra).

The query to find the top projects was written in Cascalog — the Clojure-based query language for Hadoop that we open-sourced last month. I had the job up and running within minutes. What would have been fairly complex to write as a Map-Reduce job was less than 10 lines of Cascalog. Check out these posts for more details on writing Cascalog queries. If you're looking to do some serious data mining then we'd love to hear from you.

Videos From The May Hadoop Meet-Up

Last week Nathan and I went to Yahoo! campus for the monthly Hadoop User Group meeting. Nearly 300 developers packed the room for the talks and socializing that went on well after the last presentation. The two main talks were given by Nathan, who presented and demoed Cascalog (a Clojure based query language for Hadoop), and Dmitriy, an engineer at Twitter, who walked us through their Hadoop ecosystem. I've posted the videos from both these talks — other presentations from Alan Gates of Yahoo and Tom White of Cloudera can be found on the Yahoo Developer Blog.

During his presentation, Nathan demoed the "playground" that ships with Cascalog, which is really just a dataset that is served from memory and can be used from the Clojure REPL to test map reduce queries. Be sure to checkout the slides at the end for more about how we're using Cascalog at BackType that wasn't covered during the presentation.

We're hiring brilliant engineers. If you're excited by working with tools like Cascalog to ask really hard questions of TBs of data, contact us.

Follow @BackTypeTech on Twitter!