Instagram

Instagram Engineering

Welcome to the Instagram Engineering Blog, where we share insights on building and scaling our service.

Making it simpler to take more beautiful photos

In our most recent update to the Instagram iOS app we added a simple way to make it easier to take beautiful photos. When you take a photo with the in-app camera, you can now tap the new straighten icon and your photo will correct to be level—it’s that simple. The straightening tool also includes a slider so you can rotate and adjust any photo—including ones imported from your photo library—as much or as little as you’d like.

We had been talking about adding a way to auto straighten photos for a while, even before we started working on video. As a team we had been thinking about the problem, as it is easy to accidentally take a slanted photo when you are using a mobile device, and it was something we had all encountered. We even had a couple of prototypes built a few months back, in which you could manually change the tilt angle of photos by rotating a set of wheels, with different sensitivities. However, none of these prototypes felt as if they fitted in with the simplicity of Instagram.

While brainstorming with other engineers about ways we could include this feature in an Instagram-y way, one of our engineers had the idea that it would be great if we could automatically correct photos taken in the Instagram-app camera using data from the mobile device. After all, most phones have a variety of sensors that allow us to determine the orientation and tilt of the device relative to a specific reference, meaning we can tell the angle of your phone at the time you took the picture. This seemed to us a way of making the feature quick and simple to use, which is something we try to achieve with every feature release.

How does it work?

As I mentioned, most smartphones contain sensors capable of determining the orientation and tilt of a device. When you take a picture in-app and tap on the new straighten icon, we use this sensor to straighten the photo automatically. In order to correct the angle, we rotate the photo by the opposite of the angle in which you held your device. Once the auto correction is applied, you can fine-tune the results using the feature’s UI.

You may notice that while the photo is being rotated, we need to zoom in a little. This is due to the fact that when we rotate the photo, there would be empty regions at the corners if we didn’t zoom in. You can imagine having two square pieces of paper on top of each other:

If you rotate the topmost one, you’ll start seeing the edges of the one underneath as you rotate the one at the top. Now, if the topmost paper is your photo and the bottom one is our app “window” you’ll see that we need to “fill” the edges by actually making the photo larger as you rotate, up to the maximum possible scale, which would be the square root of two (~1.4142). The maximum zoom would occur when the photo is rotated at 45º. In reality, however, if you actually took a photo in a 45º degree angle, it is most likely that you wanted it to be that way, so we only magically straighten photos for up to an angle of 25º

Designing for Simplicity

Once you enter the straightening mode, you will notice how the photo is auto straightened in an animated way; when you tap the button the photo is rotated and zoomed in before your eyes. This animation turned out to be a very important piece of the puzzle as it helps the user understand what is happening to the photo. When we first tried the animation it was almost a “eureka” moment; it made the feature appear magical, while easily explaining all the different pieces at play in the interface.

Once the animation is done, and in case we got the angle wrong, or in case you are simply feeling creative, you can still manually rotate the photo by dragging the wheel at the bottom of the screen. We added in other gestures for people looking to really fine-tune their pictures: if you tap on the edges of the wheel you can rotate by 0.1 degree increments. There are also two other ways to rotate the photo by using common iOS gestures: two finger rotation, or even one finger rotation, for easy interaction with just one hand.

We believe the final result really fits with the overall goal of simplicity and performance, and adds to the Instagram experience: help you take beautiful photos in a simple and delightful way. We are excited to get this feature into the hands of all our users, and listen to their feedback. As with everything we build, we look forward to seeing our community get creative with this new feature.

— Alex Restrepo, iOS at Instagram

Moving Pictures: Capturing Moments with Video

We’re excited to now let Instagram users capture moments using video. This allows even more vivid, artistic, and creative stories to be told from around the world. With the launch, we’re committed to making the video media available to our developer community through a new API media type value: “video”.

Important Keys:

In order to let you concentrate on building great apps and services, we’ve tried to design the video media type similar to what we currently return for image. There are only two keys that have been overloaded and one new key to consider:

  • type
  • images
  • videos

1. type

You’re already familiar with this key which has statically returned “image” as its only value. Now, depending on the API endpoint, you may also see “video” as a second value. You should use this key to branch your code for video content.

2. images

When you detect the type value as “video”, you should be aware that the images key will behave differently than when type value was “image”. More specifically, the images key will refer to the video cover frame that was selected by the content author. Similar to the formats returned for “image” media types, there will be “low_resolution”, “thumbnail”, and “standard_resolution” cover frame formats. A cover frame is required for all videos uploaded through the Instagram app.

3. videos

As the name implies, this key will contain links to the actual video content. To help save you time, we are providing two formats: a 480x480 “low_resolution” format and a 640x640 “standard_resolution” format.  Similar to our policy with photos, you should not download and store the video content to your servers, but should instead source the content from the provided links.

The Video Container:

All video content currently uses the MPEG-4 container. You should simply check the link extension of “.mp4” as we may introduce other containers in the future. Video duration can be obtained by reading the ATOM headers of the MPEG-4 container.

That’s it! You now have all you need to quickly use the videos being produced by our beloved Instagram users. For more details, check out our API docs and example responses to see the full JSON output. We look forward to seeing your video contributions, the marriage with images, and the innovations that only you can do.

Piyush Mangalick, partner engineering

Handling Growth with Postgres: 5 Tips From Instagram

As we’ve scaled Instagram to an ever-growing number of active users, Postgres has continued to be our solid foundation and the canonical data storage for most of the data created by our users. While less than a year ago, we blogged about how we “stored a lot of data” at Instagram at 90 likes per second, we’re now pushing over 10,000 likes per second at peak—and our fundamental storage technology hasn’t changed.

Over the last two and a half years, we’ve picked up a few tips and tools about scaling Postgres that we wanted to share—things we wish we knew when we first launched Instagram. Some of these are Postgres-specific while others are present in other databases as well. For background on how we’ve horizontally partitioned Postgres, check out our Sharding and IDs at Instagram post.

1. Partial Indexes

If you find yourself frequently filtering your queries by a particular characteristic, and that characteristic is present in a minority of your rows, partial indexes may be a big win.

As an example, when searching tags on Instagram, we try to surface tags that are likely to have many photos in them. While we use technologies like ElasticSearch for fancier searches in our application, this is one case where the database was good enough. Let’s see what Postgres does when searching tag names and ordering by number of photos:

EXPLAIN ANALYZE SELECT id from tags WHERE name LIKE 'snow%' ORDER BY media_count DESC LIMIT 10;      
QUERY PLAN   
---------                                                                  
 Limit  (cost=1780.73..1780.75 rows=10 width=32) (actual time=215.211..215.228 rows=10 loops=1)
   ->  Sort  (cost=1780.73..1819.36 rows=15455 width=32) (actual time=215.209..215.215 rows=10 loops=1)
         Sort Key: media_count
         Sort Method:  top-N heapsort  Memory: 25kB
         ->  Index Scan using tags_search on tags_tag  (cost=0.00..1446.75 rows=15455 width=32) (actual time=0.020..162.708 rows=64572 loops=1)
               Index Cond: (((name)::text ~>=~ 'snow'::text) AND ((name)::text ~<~ 'snox'::text))
               Filter: ((name)::text ~~ 'snow%'::text)
 Total runtime: 215.275 ms
(8 rows)

Notice how Postgres had to sort through 15,000 rows to get the right result. Since tags (for example) exhibit a long-tail pattern, we can instead first try a query against tags with over 100 photos; we’ll do:

CREATE INDEX CONCURRENTLY on tags (name text_pattern_ops) WHERE media_count >= 100

Now the query plan looks like:

EXPLAIN ANALYZE SELECT * from tags WHERE name LIKE 'snow%' AND media_count >= 100 ORDER BY media_count DESC LIMIT 10;

QUERY PLAN
 Limit  (cost=224.73..224.75 rows=10 width=32) (actual time=3.088..3.105 rows=10 loops=1)
   ->  Sort  (cost=224.73..225.15 rows=169 width=32) (actual time=3.086..3.090 rows=10 loops=1)
         Sort Key: media_count
         Sort Method:  top-N heapsort  Memory: 25kB
         ->  Index Scan using tags_tag_name_idx on tags_tag  (cost=0.00..221.07 rows=169 width=32) (actual time=0.021..2.360 rows=924 loops=1)
               Index Cond: (((name)::text ~>=~ 'snow'::text) AND ((name)::text ~<~ 'snox'::text))
               Filter: ((name)::text ~~ 'snow%'::text)
 Total runtime: 3.137 ms
(8 rows)

Notice that Postgres only had to visit 169 rows, which was way faster. Postgres’ query planner is pretty good at evaluating constraints too; if you later decided that you wanted to query tags with over 500 photos, since those are a subset of this index, it will still use the right partial index.

2. Functional Indexes

On some of our tables, we need to index strings (for example, 64 character base64 tokens) that are quite long, and creating an index on those strings ends up duplicating a lot of data. For these, Postgres’ functional index feature can be very helpful:

CREATE INDEX CONCURRENTLY on tokens (substr(token), 0, 8)

While there will be multiple rows that match that prefix, having Postgres match those prefixes and then filter down is quick, and the resulting index was 1/10th the size it would have been had we indexed the entire string.

3. pg_reorg For Compaction

Over time, Postgres tables can become fragmented on disk (due to Postgres’ MVCC concurrency model, for example). Also, most of the time, row insertion order does not match the order in which you want rows returned. For example, if you’re often querying for all likes created by one user, it’s helpful to have those likes be contiguous on disk, to minimize disk seeks.

Our solution to this is to use pg_reorg, which does a 3-step process to “compact” a table:

  1. Acquire an exclusive lock on the table
  2. Create a temporary table to accumulate changes, and add a trigger on the original table that replicates any changes to this temp table
  3. Do a CREATE TABLE using a SELECT FROM…ORDER BY, which will create a new table in index order on disk
  4. Sync the changes from the temp table that happened after the SELECT FROM started
  5. Cut over to the new table

There are some details in there around lock acquisition etc, but that’s the general approach. We vetted the tool and tried several test runs before running in production, and we’ve run dozens of reorgs across hundreds of machines without issues.

4. WAL-E for WAL archiving and backups

We use and contribute code to WAL-E, Heroku’s toolkit for continuous archiving of Postgres Write-Ahead Log files. Using WAL-E has simplified our backup and new-replica bootstrap process significantly.

At its core, WAL-E is a program that archives every WAL files generated by your PG server to Amazon’s S3, using Postgres’ archive_command. These WAL files can then be used, in combination with a base backup, to restore a DB to any point since that base backup. The combination of regular base backups and the WAL archiving means we can quickly bootstrap a new read-replica or failover slave, too.

We’ve made our simple wrapper script for monitoring repeated failures to archive a file available on GitHub.

5. Autocommit mode and async mode in psycopg2

Over time, we’ve started using more advanced features in psycopg2, the Python driver for Postgres.

The first is autocommit mode; in this mode, Psycopg2 won’t issue BEGIN/COMMIT for any queries; instead, every query runs in its own single-statement transaction. This is particularly useful for read-only queries where transaction semantics aren’t needed. It’s as easy as doing:

connection.autocommit = True

This lowered chatter between our application servers and DBs significantly, and lowered system CPU as well on the database boxes. Further, since we use PGBouncer for our connection pooling, this change allows connections to be returned to the pool sooner.

More details on how this interacts with Django’s db handling here.

Another useful psycopg2 feature is the ability to register a wait_callback for coroutine support. Using this allows for concurrent querying across multiple connections at once, which is useful for fan-out queries that hit multiple nodes—the socket will wake up and notify when there’s data to be read (we use Python’s select module for handling the wake-ups). This also plays well with cooperative multi-threading libraries like eventlet or gevent; check out psycogreen for an example implementation.

Overall, we’ve been very happy with Postgres’ performance and reliability. If you’re interested in working on one of the world’s largest Postgres installations with a small team of infrastructure hackers, get in touch at infrajobs <at> instagram.com.

You can discuss this post at Hacker News

Mike Krieger, co-founder

redis-faina: a query analysis tool for Redis

As we mentioned in our previous blog, we’re big fans of tools and techniques for introspecting our live traffic to see what exactly is going on. One tool we use frequently is PGFouine, a query traffic analyzer for PostgreSQL. We recently found that we also needed a similar tool for Redis, our swiss-army-knife list/set/etc storage.

We call it redis-faina and we’re making it open source so you can use it to monitor Redis as well (pgfouine = ‘pgweasel’ in French, redis-faina = ‘redisweasel’ in Italian as a tribute to Redis’ Italian heritage).

At its core, redis-faina uses the Redis MONITOR command, which echoes every single command (with arguments) sent to a Redis instance. It parses these entries, and aggregates stats on the most commonly-hit keys, the queries that took up the most amount of time, and the most common key prefixes as well. Using it, we’ve cut the number of requests on one of our systems from 23,000 requests per second to ~11,000, by identifying places where we could batch-fetch information or eliminate extraneous commands.

Usage is simple:

# reading from stdin
redis-cli -p 6490 MONITOR | head -n <NUMBER OF LINES TO ANALYZE> | ./redis-faina.py

# reading a file
redis-cli -p 6490 MONITOR | head -n <...> > /tmp/outfile.txt
./redis-faina.py /tmp/outfile.txt

The output (anonymized below with ‘zzz’s) looks as follows:

Overall Stats
========================================
Lines Processed     117773
Commands/Sec        11483.44

Top Prefixes
========================================
friendlist          69945
followedbycounter   25419
followingcounter    10139
recentcomments      3276
queued              7

Top Keys
========================================
friendlist:zzz:1:2     534
followingcount:zzz     227
friendlist:zxz:1:2     167
friendlist:xzz:1:2     165
friendlist:yzz:1:2     160
friendlist:gzz:1:2     160
friendlist:zdz:1:2     160
friendlist:zpz:1:2     156

Top Commands
========================================
SISMEMBER   59545
HGET        27681
HINCRBY     9413
SMEMBERS    9254
MULTI       3520
EXEC        3520
LPUSH       1620
EXPIRE      1598

Command Time (microsecs)
========================================
Median      78.25
75%         105.0
90%         187.25
99%         411.0

Heaviest Commands (microsecs)
========================================
SISMEMBER   5331651.0
HGET        2618868.0
HINCRBY     961192.5
SMEMBERS    856817.5
MULTI       311339.5
SADD        54900.75
SREM        40771.25
EXEC        28678.5

Slowest Calls
========================================
3490.75     "SMEMBERS" "friendlist:zzz:1:2"
2362.0      "SMEMBERS" "friendlist:xzz:1:3"
2061.0      "SMEMBERS" "friendlist:zpz:1:2"
1961.0      "SMEMBERS" "friendlist:yzz:1:2"
1947.5      "SMEMBERS" "friendlist:zpz:1:2"
1459.0      "SISMEMBER" "friendlist:hzz:1:2" "zzz"
1416.25     "SMEMBERS" "friendlist:zhz:1:2"
1389.75     "SISMEMBER" "friendlist:zzx:1:2" "zzz"

One caveat on timing: MONITOR only shows the time a command completed, not when it started. On a very busy Redis server (like most of ours), this is fine because there’s always a request waiting to execute, but if you’re at a lesser rate of requests, the time taken will not be accurate.

Also, MONITORing calls doesn’t come for free, so we mostly use to sample for a couple hundred thousand lines to get a representative sample.

Want to add more stats and improvements to redis-faina? Fork and send pull requests!

Want to work on analyzing, optimizing and designing systems that handle hundreds of thousands of requests per second across many, many machines? We’re hiring! Drop us a note and tell us a bit about yourself - we’re actively building out our dev & devops team.

Keeping Instagram up with over a million new users in twelve hours

On Tuesday we launched Instagram for Android, and it’s had a fantastic response so far. The last few weeks (on the infrastructure side) have been all about capacity planning and preparation to get everything in place, but on launch day itself the challenge is to find problems quickly, get to the bottom of them, and roll out fixes ASAP. Here are some tools & techniques we used to tackle problems as they arose:

statsd

We love statsd at Instagram. Written by Etsy, it’s a network daemon that aggregates and rolls-up data into Graphite. At its core, it has two types of statistics: counter and timers. We use the counters to track everything from number of signups per second to number of likes, and we use timers to time generation of feeds, how long it takes to follow users, and any other major action.

The single biggest reason we love statsd is how quickly stats show up and get updated in Graphite. Stats are basically realtime (in our system, they’re about 10 seconds delayed), which allows us to evaluate system and code changes immediately. Stats can be added at will, so if we discover a new metric to track, we can have it up and running very quickly. You can specify a sample rate, so we sprinkle logging calls throughout the web application at relatively low sample rates, without affecting performance.

Takeaway: having realtime stats that can be added dynamically lets you diagnose and firefight without having to wait to receive new data.

Dogslow

Written by Bitbucket, Dogslow is a piece of Django middleware that will watch your running processes, and if notices any taking longer than N seconds, will snapshot the current process and write the file to disk. We’ve found it’s too intrusive to run all the time, but when trying to identify bottlenecks that may have cropped up, it’s very useful (we’ve added a switch to enable it in our web servers).

We found, halfway through launch day, that processes that were taking over 1.5s to return a response were often stuck in memcached set() and get_many(). Switching over to Munin, which we use to track our machine stats over time, we saw that our memcached boxes were pushing 50k req/s, and though they weren’t maxing out the CPU, they were busy enough to slow down the application servers.

Takeaway: it’s often one piece of the backend infrastructure that becomes a bottleneck, and figuring out the point at which your real, live appservers get stuck can help surface the issue.

Replication & Read-slaves

Two of our main data backends—Redis and PostgreSQL—both support easy replication and read-slaving. When one of our Redis DBs crossed 40k req/s, and started becoming a bottleneck, bringing up another machine, SYNCing to the master, and sending read queries to it took less than 20 minutes. For machines we knew would be busy ahead of time, we’d brought up read-slaves, but in a couple of cases, machines reacted differently under load than we’d projected, and it was useful to split reads off quickly.

For Postgres, we use a combination of Streaming Replication and Amazon EBS Snapshots to bring up a new read-slave quickly. All of our master DBs stream to backup slaves that take frequent EBS snapshots; from these snapshots, we can have a new read-slave up and running, and caught up to the master, in around 20 minutes. Having our machines in an easily scriptable environment like AWS make provisioning and deploying new read-slaves a quick command-line task.

Takeaway: if read capacity is likely to be a concern, bringing up read-slaves ahead of time and getting them in rotation is ideal; if any new read issues crop up, however, know ahead of time what your options are for bringing more read capacity into rotation.

PGFouine

PGFouine is a tool that analyzes PostgreSQL query logs and generates a page of analytics on their impact on your database; sliced by the “heaviest”, or most frequent, or slowest queries. To ease running it, we’ve created a Fabric script that will connect to a database, set it to log every query, wait 30 seconds, then download the file and run a pgfouine analysis on it; it’s available as a gist. PGFouine is our core tool in analyzing database performance and figuring out which queries could use memcached in front of them, which ones are fetching more data than is necessary, etc; as DBs showed signs of stress on launch day, we would run PGFouine, deploy targeted code improvement to relieve hotspots, and then run it again to make sure those changes had the correct effect.

It’s important to know what a “normal” day looks like for your databases, too, for a baseline, so we run PGFouine periodically to gather statistics on non-stressed-out database instances, too.

Takeaway: Database log analysis (especially coupled with a tight iteration loop on optimizing queries and caching what’s needed)

One more thing

Another tool that helped us get through the first day was one we wrote ourselves—node2dm, a node.js server for delivering push notifications to Android’s C2DM service. It’s handled over 5 million push notifications for us so far.

We surveyed the different options for C2DM servers, but didn’t find any open source ones that looked like they were being actively maintained, or fully supported the Google service. We’re open sourcing node2dm today; feel free to fork and pull-request if you have any suggestions for improvements.

Interested?

If all of this is interesting/exciting to you, and you’d like to chat more about working with us, drop us a note; we’d love to hear from you.

You can discuss this post at Hacker News.

Mike Krieger, co-founder

What Powers Instagram: Hundreds of Instances, Dozens of Technologies

One of the questions we always get asked at meet-ups and conversations with other engineers is, “what’s your stack?” We thought it would be fun to give a sense of all the systems that power Instagram, at a high-level; you can look forward to more in-depth descriptions of some of these systems in the future. This is how our system has evolved in the just-over-1-year that we’ve been live, and while there are parts we’re always re-working, this is a glimpse of how a startup with a small engineering team can scale to our 14 million+ users in a little over a year. Our core principles when choosing a system are:

  • Keep it very simple
  • Don’t re-invent the wheel
  • Go with proven and solid technologies when you can

We’ll go from top to bottom:

OS / Hosting

We run Ubuntu Linux 11.04 (“Natty Narwhal”) on Amazon EC2. We’ve found previous versions of Ubuntu had all sorts of unpredictable freezing episodes on EC2 under high traffic, but Natty has been solid. We’ve only got 3 engineers, and our needs are still evolving, so self-hosting isn’t an option we’ve explored too deeply yet, though is something we may revisit in the future given the unparalleled growth in usage.

Load Balancing

Every request to Instagram servers goes through load balancing machines; we used to run 2 nginx machines and DNS Round-Robin between them. The downside of this approach is the time it takes for DNS to update in case one of the machines needs to get decomissioned. Recently, we moved to using Amazon’s Elastic Load Balancer, with 3 NGINX instances behind it that can be swapped in and out (and are automatically taken out of rotation if they fail a health check). We also terminate our SSL at the ELB level, which lessens the CPU load on nginx. We use Amazon’s Route53 for DNS, which they’ve recently added a pretty good GUI tool for in the AWS console.

Application Servers

Next up comes the application servers that handle our requests. We run Django on Amazon High-CPU Extra-Large machines, and as our usage grows we’ve gone from just a few of these machines to over 25 of them (luckily, this is one area that’s easy to horizontally scale as they are stateless). We’ve found that our particular work-load is very CPU-bound rather than memory-bound, so the High-CPU Extra-Large instance type provides the right balance of memory and CPU.

We use http://gunicorn.org/ as our WSGI server; we used to use mod_wsgi and Apache, but found Gunicorn was much easier to configure, and less CPU-intensive. To run commands on many instances at once (like deploying code), we use Fabric, which recently added a useful parallel mode so that deploys take a matter of seconds.

Data storage

Most of our data (users, photo metadata, tags, etc) lives in PostgreSQL; we’ve previously written about how we shard across our different Postgres instances. Our main shard cluster involves 12 Quadruple Extra-Large memory instances (and twelve replicas in a different zone.)

We’ve found that Amazon’s network disk system (EBS) doesn’t support enough disk seeks per second, so having all of our working set in memory is extremely important. To get reasonable IO performance, we set up our EBS drives in a software RAID using mdadm.

As a quick tip, we’ve found that vmtouch is a fantastic tool for managing what data is in memory, especially when failing over from one machine to another where there is no active memory profile already. Here is the script we use to parse the output of a vmtouch run on one machine and print out the corresponding vmtouch command to run on another system to match its current memory status.

All of our PostgreSQL instances run in a master-replica setup using Streaming Replication, and we use EBS snapshotting to take frequent backups of our systems. We use XFS as our file system, which lets us freeze & unfreeze the RAID arrays when snapshotting, in order to guarantee a consistent snapshot (our original inspiration came from ec2-consistent-snapshot. To get streaming replication started, our favorite tool is repmgr by the folks at 2ndQuadrant.

To connect to our databases from our app servers, we made early on that had a huge impact on performance was using Pgbouncer to pool our connections to PostgreSQL. We found Christophe Pettus’s blog to be a great resource for Django, PostgreSQL and Pgbouncer tips.

The photos themselves go straight to Amazon S3, which currently stores several terabytes of photo data for us. We use Amazon CloudFront as our CDN, which helps with image load times from users around the world (like in Japan, our second most-popular country).

We also use Redis extensively; it powers our main feed, our activity feed, our sessions system (here’s our Django session backend), and other related systems. All of Redis’ data needs to fit in memory, so we end up running several Quadruple Extra-Large Memory instances for Redis, too, and occasionally shard across a few Redis instances for any given subsystem. We run Redis in a master-replica setup, and have the replicas constantly saving the DB out to disk, and finally use EBS snapshots to backup those DB dumps (we found that dumping the DB on the master was too taxing). Since Redis allows writes to its replicas, it makes for very easy online failover to a new Redis machine, without requiring any downtime.

For our geo-search API, we used PostgreSQL for many months, but once our Media entries were sharded, moved over to using Apache Solr. It has a simple JSON interface, so as far as our application is concerned, it’s just another API to consume.

Finally, like any modern Web service, we use Memcached for caching, and currently have 6 Memcached instances, which we connect to using pylibmc & libmemcached. Amazon has an Elastic Cache service they’ve recently launched, but it’s not any cheaper than running our instances, so we haven’t pushed ourselves to switch quite yet.

Task Queue & Push Notifications

When a user decides to share out an Instagram photo to Twitter or Facebook, or when we need to notify one of our Real-time subscribers of a new photo posted, we push that task into Gearman, a task queue system originally written at Danga. Doing it asynchronously through the task queue means that media uploads can finish quickly, while the ‘heavy lifting’ can run in the background. We have about 200 workers (all written in Python) consuming the task queue at any given time, split between the services we share to. We also do our feed fan-out in Gearman, so posting is as responsive for a new user as it is for a user with many followers.

For doing push notifications, the most cost-effective solution we found was https://github.com/samuraisam/pyapns, an open-source Twisted service that has handled over a billion push notifications for us, and has been rock-solid.

Monitoring

With 100+ instances, it’s important to keep on top of what’s going on across the board. We use Munin to graph metrics across all of our system, and also alert us if anything is outside of its normal range. We write a lot of custom Munin plugins, building on top of Python-Munin, to graph metrics that aren’t system-level (for example, signups per minute, photos posted per second, etc). We use Pingdom for external monitoring of the service, and PagerDuty for handling notifications and incidents.

For Python error reporting, we use Sentry, an awesome open-source Django app written by the folks at Disqus. At any given time, we can sign-on and see what errors are happening across our system, in real time.

You?

If this description of our systems interests you, or if you’re hopping up and down ready to tell us all the things you’d change in the system, we’d love to hear from you. We’re looking for a DevOps person to join us and help us tame our EC2 instance herd.

Instagram Engineering Challenge: The Unshredder

In our office, we have a pretty amazing paper shredder. Seriously, the thing shreds just about anything. It even has a special slot for credit cards (why anyone would want to regularly shred credit cards is beyond me, but I digress…).

One day, after shredding some paper, I thought to myself: shredding paper is a pretty insecure way of destroying important stuff. I figured, it’s a small set of shreds that are all relatively uniform in width and could be pieced back together algorithmically in a fraction of a second.

So, I sat down and though about what approach I’d use to piece the document back together. It’s unlike a regular puzzle in that all the pieces are exactly the same size, so you can’t rely upon the spatial domain to solve piecing shreds together. However, if you think about it, there’s a pretty simple approach that would allow you to find matches in a different domain. That is, imagine you’re sitting there trying to find a match between two pieces. What are you looking for to decide whether they’re a fit or not?

Anyway, we got really excited about writing a script to take in an image of shreds of paper and piece them back into an original document. It’s an interesting challenge that marries image processing with an interesting algorithmic challenge as well.

THE CHALLENGE

Your challenge, if you choose to accept it, is to write a simple script that takes a shredded image in as input:

and outputs an unshredded and reconstituted image. That is, imagine if you took an image, divided it into an even number of columns and shuffled those columns randomly to produce a shredded image. Then, take that image into the script and output the original image:

We tackled this, and our solution took a few hours plus another few hours for the bonus challenge (more on that later).

THE REWARD

Due to overwhelming response, we’ve run out of our entire stock of tee-shirts! With future challenges we’ll be offering a reward for the first group of people who respond.

GUIDELINES

1) Choose a scripting language of your choice. We chose Python for its relative ease prototyping and availability of the Python Imaging Library (PIL) that allowed us to do the image stuff we wanted to do. You can easily use something like C++ or Ruby for this as well.

2) Produce a script that reads in a shredded image (like the one below) and produces the original image. For this image, you can assume shreds are 32 pixels wide and uniformly spaced across the image horizontally. These shreds are scattered at random and if rearranged, will yield the original image.

Use this image as the source image - it’s 640 pixels wide and 359 pixels high.

3) Your solution should algorithmically unshred the image. This means it should work on arbitrarily shredded images we feed your script that are shredded in the same manner.

4) BONUS CHALLENGE: We went the extra mile and made our script even spiffier by auto-detecting how wide the uniform strips are. Extra bonus points to anyone who works this into their solution. But first, we’d recommend getting your script to work assuming 32 pixel-wide shreds. For this you can assume shreds will never end up next to each other correctly in the source image.

5) The key to this problem is being able to access pixel data in the image. We used Python Imaging Library - PIL (http://www.pythonware.com/products/pil/) which made it very easy to parse. See the PIL tips below. If you’re using Ruby, check out RMagick (http://rmagick.rubyforge.org/) which is a gem that serves the same purpose as PIL. C++ has the boost libraries and included is “GIL” which will help you. If you’re using another language, there are most certainly equivalents of PIL, RMagick, and GIL.

SUBMIT YOUR SOLUTION

We’re no longer offering the tee-shirt reward but if you’re still interested in working with us, please submit your information & a link to your solution here: http://bit.ly/unshredder

PIL TIPS

from PIL import Image
image = Image.open(‘file.jpg’)
data = image.getdata() # This gets pixel data

# Access an arbitrary pixel. Data is stored as a 2d array where rows are
# sequential. Each element in the array is a RGBA tuple (red, green, blue,
# alpha).

x, y = 20, 90
def get_pixel_value(x, y):
   width, height = image.size
   pixel = data[y * width + x]
   return pixel
print get_pixel_value(20, 30)

# Create a new image of the same size as the original
# and copy a region into the new image
NUMBER_OF_COLUMNS = 5
unshredded = Image.new(“RGBA”, image.size)
shred_width = unshredded.size[0]/NUMBER_OF_COLUMNS
shred_number = 1
x1, y1 = shred_width * shred_number, 0
x2, y2 = x1 + shred_width, height
source_region = image.crop(x1, y1, x2, y2)
destination_point = (0, 0)
unshredded.paste(source_region, destination_point)
# Output the new image
unshredded.save(“unshredded.jpg”, “JPEG”)

TIPS

1) Don’t overthink it. Use of really complex algorithms isn’t needed. Our solution WITH the bonus exercise comes in at just over 150 lines of python.

2) Think about how you would quantify whether or not two shreds ‘fit’ together by using pixel data

3) Assume you’re using the source image, or other normal photographs without edge-case patterns.

4) There are edge cases where the script we wrote with our approach will not work because of repeating patterns. This is OK in your script as well. Don’t worry about special cases – focus on making the sample images work that we’ve provided.

4) Bonus Challenge: If you decide you want to auto-detect how many columns there are in an image, you should remember that there are a finite amount of columns that are possible given an image of a certain width if you assume columns are evenly distributed and uniformly sized.

SHREDDER

If you’d like to produce your own sample images, you can use our simple script here to generate some:

from PIL import Image
from random import shuffle

SHREDS = 10
image = Image.open(“sample.png”)
shredded = Image.new(“RGBA”, image.size)
width, height = image.size
shred_width = width/SHREDS
sequence = range(0, SHREDS)
shuffle(sequence)

for i, shred_index in enumerate(sequence):
    shred_x1, shred_y1 = shred_width * shred_index, 0
    shred_x2, shred_y2 = shred_x1 + shred_width, height
    region =image.crop((shred_x1, shred_y1, shred_x2, shred_y2))
    shredded.paste(region, (shred_width * i, 0))

shredded.save(“sample_shredded.png”)

Storing hundreds of millions of simple key-value pairs in Redis

When transitioning systems, sometimes you have to build a little scaffolding. At Instagram, we recently had to do just that: for legacy reasons, we need to keep around a mapping of about 300 million photos back to the user ID that created them, in order to know which shard to query (see more info about our sharding setup). While eventually all clients and API applications will have been updated to pass us the full information, there are still plenty who have old information cached. We needed a solution that would:

  1. Look up keys and return values very quickly
  2. Fit the data in memory, and ideally within one of the EC2 high-memory types (the 17GB or 34GB, rather than the 68GB instance type)
  3. Fit well into our existing infrastructure
  4. Be persistent, so that we wouldn’t have to re-populate it if a server died

One simple solution to this problem would be to simply store them as a bunch of rows in a database, with “Media ID” and “User ID” columns. However, a SQL database seemed like overkill given that these IDs were never updated (only inserted), didn’t need to be transactional, and didn’t have any relations with other tables.

Instead, we turned to Redis, an advanced key-value store that we use extensively here at Instagram (for example, it powers our main feed). Redis is a key-value swiss-army knife; rather than just normal “Set key, get key” mechanics like Memcached, it provides powerful aggregate types like sorted sets and lists. It has a configurable persistence model, where it background saves at a specified interval, and can be run in a master-slave setup. All of our Redis deployments run in master-slave, with the slave set to save to disk about every minute.

At first, we decided to use Redis in the simplest way possible: for each ID, the key would be the media ID, and the value would be the user ID:

SET media:1155315 939
GET media:1155315
> 939

While prototyping this solution, however, we found that Redis needed about 70 MB to store 1,000,000 keys this way. Extrapolating to the 300,000,000 we would eventually need, it was looking to be around 21GB worth of data—already bigger than the 17GB instance type on Amazon EC2.

We asked the always-helpful Pieter Noordhuis, one of Redis’ core developers, for input, and he suggested we use Redis hashes. Hashes in Redis are dictionaries that are can be encoded in memory very efficiently; the Redis setting ‘hash-zipmap-max-entries’ configures the maximum number of entries a hash can have while still being encoded efficiently. We found this setting was best around 1000; any higher and the HSET commands would cause noticeable CPU activity. For more details, you can check out the zipmap source file.

To take advantage of the hash type, we bucket all our Media IDs into buckets of 1000 (we just take the ID, divide by 1000 and discard the remainder). That determines which key we fall into; next, within the hash that lives at that key, the Media ID is the lookup key *within* the hash, and the user ID is the value. An example, given a Media ID of 1155315, which means it falls into bucket 1155 (1155315 / 1000 = 1155):

HSET "mediabucket:1155" "1155315" "939"
HGET "mediabucket:1155" "1155315"
> "939"

The size difference was pretty striking; with our 1,000,000 key prototype (encoded into 1,000 hashes of 1,000 sub-keys each), Redis only needs 16MB to store the information. Expanding to 300 million keys, the total is just under 5GB—which in fact, even fits in the much cheaper m1.large instance type on Amazon, about 1/3 of the cost of the larger instance we would have needed otherwise. Best of all, lookups in hashes are still O(1), making them very quick.

If you’re interested in trying these combinations out, the script we used to run these tests is available as a Gist on GitHub (we also included Memcached in the script, for comparison—it took about 52MB for the million keys). And if you’re interested in working on these sorts of problems with us, drop us a note, we’re hiring!.


Comments? Questions? Discuss this post at Hacker News.


Mike Krieger, co-founder

Simplifying EC2 SSH Connections

Here at Instagram, we run our infrastructure on Amazon Web Services, running instances on their Elastic Compute Cloud (EC2). Since we’re often spinning up new machines and changing details of our infrastructure, there’s an ever-growing list of machines that we SSH into.

To authenticate with our instances, we use public key authentication (the recommended way of doing SSH log-ins to EC2 machines), but we need to figure out what host we’re connecting to first. A common way of connecting to an EC2 instance is via it’s public hostname (ec2-x-x-x-x.compute-1.amazonaws.com). However, managing a list of these hostnames once there are several of them is tedious. Because of this, we wrote EC2-SSH, a set of Python scripts that help easily connect to EC2 instances, and that we’re open sourcing for the community’s benefit.

EC2-SSH takes advantage of the EC2 instance tagging. If you’re using the web console, when launching a new instance, Amazon prompts you to provide an optional value for the (pre-filled) “Name” tag. Tags can also be edited using the available EC2 command line tools.

Assuming you’ve already tagged all of your instances with names, using EC2-SSH is as easy as using regular SSH with hostnames— because, behind the scenes, that’s all it’s really doing. Better illustrated with an example: Let’s assume you have an instance tagged with the name “nginx3”; using EC2-SSH you could connect to the instance by typing `ec2-ssh nginx3` into your terminal.

EC2-SSH first calls the Amazon EC2 web service, resolving the tag name (in this case “nginx3”) to the public DNS address. It then substitutes out the tag name with the hostname and sends it, along with any other arguments and parameters, to `/usr/bin/ssh`.

The `ec2-ssh` script is small shell script that calls another Python script, `ec2-host`, that eventually calls `/usr/bin/ssh`. Let’s detail out the process in depth:

The Python script `ec2-host` is distributed in the EC2-SSH Python package and can be used unaccompanied— you might find it rather useful, in fact. Let’s take a look at it’s usage output:

% ec2-host --help

Usage: ec2-host [-k KEY] [-s SECRET] [NAME]
Prints server host name.

  --help                  display this help and exit
  -k, --aws-key KEY       EC2 Key, defaults to ENV[AWS_ACCESS_KEY_ID]
-s, --aws-secret SECRET EC2 Secret, defaults to ENV[AWS_SECRET_ACCESS_KEY]

By default, with no arguments, `ec2-host` host will return a list of all running EC2 instances and their associated public host names. I often use `ec2-host` this way, combined with grep I use it to filter out and identify a specific instance, or set of instances. Here’s an example:

% ec2-host | grep django
django1    ec2-x-x-x-x.amazonaws.com
django2    ec2-x-x-x-x.amazonaws.com
....

When passing the value of an instance’s “Name” tag as an argument, ec2-host will return the associated public hostname. This is exactly what the `ec2-ssh` shell script does. Here’s an example:

% ec2-host nginx2
ec2-x-x-x-x.compute-1.amazonaws.com

You may be asking how `ec2-host` has access to enumerate over your running EC2 instances, that’s a valid question. Before you start using `ec2-ssh` or `ec2-host` you have to provide your AWS key and secret. You can pass them via command line arguments to `ec2-host` like this:

% ec2-host --aws-key  AKJASKSA1234JDSJ8123 --aws-secret B3JDJRYQ1234QWRHFJ1234AJJDAH1kjd1234

To save time, you can also set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.

Putting it all together, if you issue the following command, you’ll be signed in to the instance in your system with the specified name:

% ec2-ssh nginx3

Any arguments after the instance name will be passed into ssh:

% ec2-ssh nginx5 date  
Thu Oct 13 17:25:22 UTC 2011

We’ve made these tools available via PyPy, so you can issue an `easy_install ec2-ssh` or a `pip install ec2-ssh` to install the tools. If you’d like to contribute, you can also fork the code on GitHub, or discuss this post on Hacker News . And if you’re interested in helping us scale our systems, we’d love to hear from you.

Shayne Sweeney, Mobile & Server Engineer

Sharding & IDs at Instagram

With more than 25 photos & 90 likes every second, we store a lot of data here at Instagram. To make sure all of our important data fits into memory and is available quickly for our users, we’ve begun to shard our data—in other words, place the data in many smaller buckets, each holding a part of the data.

Our application servers run Django with PostgreSQL as our back-end database. Our first question after deciding to shard out our data was whether PostgreSQL should remain our primary data-store, or whether we should switch to something else. We evaluated a few different NoSQL solutions, but ultimately decided that the solution that best suited our needs would be to shard our data across a set of PostgreSQL servers.

Before writing data into this set of servers, however, we had to solve the issue of how to assign unique identifiers to each piece of data in the database (for example, each photo posted in our system). The typical solution that works for a single database—just using a database’s natural auto-incrementing primary key feature—no longer works when data is being inserted into many databases at the same time. The rest of this blog post addresses how we tackled this issue.

Before starting out, we listed out what features were essential in our system:

  1. Generated IDs should be sortable by time (so a list of photo IDs, for example, could be sorted without fetching more information about the photos)
  2. IDs should ideally be 64 bits (for smaller indexes, and better storage in systems like Redis)
  3. The system should introduce as few new ‘moving parts’ as possible—a large part of how we’ve been able to scale Instagram with very few engineers is by choosing simple, easy-to-understand solutions that we trust.

Existing solutions

Many existing solutions to the ID generation problem exist; here are a few we considered:

Generate IDs in web application

This approach leaves ID generation entirely up to your application, and not up to the database at all. For example, MongoDB’s ObjectId, which is 12 bytes long and encodes the timestamp as the first component. Another popular approach is to use UUIDs.

Pros:

  1. Each application thread generates IDs independently, minimizing points of failure and contention for ID generation
  2. If you use a timestamp as the first component of the ID, the IDs remain time-sortable

Cons:

  1. Generally requires more storage space (96 bits or higher) to make reasonable uniqueness guarantees
  2. Some UUID types are completely random and have no natural sort

Generate IDs through dedicated service

Ex: Twitter’s Snowflake, a Thrift service that uses Apache ZooKeeper to coordinate nodes and then generates 64-bit unique IDs

Pros:

  1. Snowflake IDs are 64-bits, half the size of a UUID
  2. Can use time as first component and remain sortable
  3. Distributed system that can survive nodes dying

Cons:

  1. Would introduce additional complexity and more ‘moving parts’ (ZooKeeper, Snowflake servers) into our architecture

DB Ticket Servers

Uses the database’s auto-incrementing abilities to enforce uniqueness. Flickr uses this approach, but with two ticket DBs (one on odd numbers, the other on even) to avoid a single point of failure.

Pros:

  1. DBs are well understood and have pretty predictable scaling factors

Cons:

  1. Can eventually become a write bottleneck (though Flickr reports that, even at huge scale, it’s not an issue).
  2. An additional couple of machines (or EC2 instances) to admin
  3. If using a single DB, becomes single point of failure. If using multiple DBs, can no longer guarantee that they are sortable over time.

Of all the approaches above, Twitter’s Snowflake came the closest, but the additional complexity required to run an ID service was a point against it. Instead, we took a conceptually similar approach, but brought it inside PostgreSQL.

Our solution

Our sharded system consists of several thousand ‘logical’ shards that are mapped in code to far fewer physical shards. Using this approach, we can start with just a few database servers, and eventually move to many more, simply by moving a set of logical shards from one database to another, without having to re-bucket any of our data. We used Postgres’ schemas feature to make this easy to script and administrate.

Schemas (not to be confused with the SQL schema of an individual table) are a logical grouping feature in Postgres. Each Postgres DB can h2have several schemas, each of which can contain one or more tables. Table names must only be unique per-schema, not per-DB, and by default Postgres places everything in a schema named ‘public’.

Each ‘logical’ shard is a Postgres schema in our system, and each sharded table (for example, likes on our photos) exists inside each schema.

We’ve delegated ID creation to each table inside each shard, by using PL/PGSQL, Postgres’ internal programming language, and Postgres’ existing auto-increment functionality.

Each of our IDs consists of:

  • 41 bits for time in milliseconds (gives us 41 years of IDs with a custom epoch)
  • 13 bits that represent the logical shard ID
  • 10 bits that represent an auto-incrementing sequence, modulus 1024. This means we can generate 1024 IDs, per shard, per millisecond

Let’s walk through an example: let’s say it’s September 9th, 2011, at 5:00pm and our ‘epoch’ begins on January 1st, 2011. There have been 1387263000 milliseconds since the beginning of our epoch, so to start our ID, we fill the left-most 41 bits with this value with a left-shift:

id = 1387263000 << (64-41)

Next, we take the shard ID for this particular piece of data we’re trying to insert. Let’s say we’re sharding by user ID, and there are 2000 logical shards; if our user ID is 31341, then the shard ID is 31341 % 2000 -> 1341. We fill the next 13 bits with this value:

id |= 1341 << (64-41-13)

Finally, we take whatever the next value of our auto-increment sequence (this sequence is unique to each table in each schema) and fill out the remaining bits. Let’s say we’d generated 5,000 IDs for this table already; our next value is 5,001, which we take and mod by 1024 (so it fits in 10 bits) and include it too:

id |= (5001 % 1024)

We now have our ID, which we can return to the application server using the RETURNING keyword as part of the INSERT.

Here’s the PL/PGSQL that accomplishes all this (for an example schema insta5):

CREATE OR REPLACE FUNCTION insta5.next_id(OUT result bigint) AS $$
DECLARE
    our_epoch bigint := 1314220021721;
    seq_id bigint;
    now_millis bigint;
    shard_id int := 5;
BEGIN
    SELECT nextval('insta5.table_id_seq') %% 1024 INTO seq_id;

    SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
    result := (now_millis - our_epoch) << 23;
    result := result | (shard_id << 10);
    result := result | (seq_id);
END;
$$ LANGUAGE PLPGSQL;

And when creating the table, we do:

CREATE TABLE insta5.our_table (
    "id" bigint NOT NULL DEFAULT insta5.next_id(),
    ...rest of table schema...
)

And that’s it! Primary keys that are unique across our application (and as a bonus, contain the shard ID in them for easier mapping). We’ve been rolling this approach into production and are happy with the results so far. Interested in helping us figure out these problems at scale? We’re hiring!

Mike Krieger, co-founder

Discuss this post on Hacker News.