2016-02-10

https://engineering.linkedin.com/espresso/introducing-espresso-linkedins-hot-new-distributed-document-store
Member Profile, InMail (LinkedIn's member-to-member messaging system), portions of the Homepage and mobile applications

Much of LinkedIn requires a primary, strongly consistent, read/write data store that generates a timeline-consistent change capture stream to fulfill nearline and offline processing requirements.
Existing RDBMS solutions are painful for several reasons, including but not limited to:

Schema Evolution: Schema evolution in RDBMS's is accomplished by expensive alter-table statements and is extremely painful. Altering tables requires manual DBA intervention and blocks application teams from iterating quickly. Fast product iteration is essential for an internet company like LinkedIn.
Provisioning shards: Provisioning new shards is a lot of manual work and requires significant application specific configuration. All applications teams bear this burden.
Data Center Failover: LinkedIn's legacy RDBMSs operated in master/slave mode. Failover from one data center to another required coordination with DBA teams and generally required at least some downtime.
Cost: Legacy RDBMSs operate on expensive hardware and required expensive annual software licenses.
On the other hand, Voldemort is quite often used by applications that require key-value access but not a timeline consistent change-capture stream and other features provided by an RDBMS.

Requirements

Elasticity: The ability to scale clusters horizontally by simply adding additional nodes

Consistency:

Support read-after write as well as eventually consistent reads

Guaranteed secondary index consistency with base data

Transactional updates within a single partition

Distributed: The ability to distribute a single database across multiple nodes in a cluster. The number of partitions as well as the replication factor should be configurable per database

Fault Tolerant: The system should continue to operate in the presence of individual machine failures with no operator intervention required.

Secondary Indexing: Text and attribute search etc.

Schema Evolution: Schema evolution should be supported with zero downtime and no coordination with DBA teams

Change Capture Stream: Changes to the primary data store should be forwarded to downstream consumers via a change capture stream.

Bulk Ingest: It should be possible to produce datasets offline (e.g. on HDFS via Hadoop) and import the data for online serving.

Data Model

Espresso provides a hierarchical data model. The hierarchy is database->table->collection->document. Conceptually, databases and tables are exactly the same as in any RDBMS. Let's examine the data model via examples. Database and table schemas are defined in JSON. Document schemas are defined in Avro.

Database

A database is a container for tables. All tables within a database are partitioned identically and share the same physical processing resources. Databases are defined by their schema.

A database schema contains important metadata about a database, including the database name, the version of the schema, the number of partitions (numBuckets) and the partitioning function to use when allocating documents to partitions. It is also used to define database traffic quotas e.g., read/write QPS, volume of data read/written etc.

{

"schemaType" : "DBSchema",

"name" : "MailboxDB",

"version": 1,

"doc" : "Espresso Schema for MailboxDB",

"partitionType" : "hash",

"numBuckets" : 1024,

"sla": {"maxReadRequestsPerSecond" : 10000, "maxWriteRequestsPerSecond" : 2000}

}

A table is a container of homogeneously typed documents. A single database can contain multiple tables. Every table schema defines a key-structure which can have multiple parts. The key-structure defines how documents are accessed. Every fully specified key is the primary key for a single document. The leading key in the table schema is also called the partitioning key. This key is used to determine which partition contains a given document. The entire key-space within a partition-key is hierarchical as illustrated in the following example.

{

"schemaType": "TableSchema",

"name": "Messages",

"version": 1,

"recordType": "MailboxDB/Messages",

"resourceKeyParts":[

{"name":"MailboxID", "type":"STRING"},

{"name":"MessageID", "type":"INT"}

]

}

This example table schema defines a 2-part key. MailboxID is the partitioning key and MessageID is the sub-key. Specifying both the keys is sufficient to uniquely identify any document within the "Messages" table. These table key parts are used to create an HTTP request to Espresso. The general form of an Espresso document URL is:

Collections

As mentioned before, the Espresso keyspace is hierarchical. Documents identified by a partially specified key are said to be a collection. For example, the URI /MailboxDB/Messages/100 identifies a collection of records within the same mailbox. Espresso guarantees transactional behavior for updates to multiple resources that share a common partition key if the updates are contained in a single HTTP request.

A fully specified key uniquely identifies a single document. A document schema is an Avro schema. Internally Espresso stores documents as Avro serialized binary data blobs. The "indexType" attribute implies that a secondary index has to be built on that field.

{

"schemaType": "DocumentSchema",

"type": "record",

"name": "Messages",

"version": 1,

"doc": "Espresso schema for simple email message",

"fields" : [

{"name":"from", "type":"string", "indexType" : "text"},

{"name":"subject", "type":"string"},

{"name":"body", "type":"string", "indexType":"text"}

]

}

Architecture

The overall Espresso architecture can be summarized in this diagram. Let us look at each of these components.



Router

As the name suggests, the router is a stateless HTTP proxy. This is the entry point for all client requests into Espresso. It examines the URL to determine the database, hashes the partition key to determine the partition required to serve the request and forwards the request to the corresponding storage node. The router has a locally cached routing table that reflects the distribution of partitions among all the storage nodes within the cluster for all databases. This routing table is updated upon any state changes within the cluster viaZooKeeper. The router supports multi-gets across multiple partitions. It scatter-gathers the requests to multiple storage nodes in parallel and sends the merged response back to the client.

Espresso currently uses Hash based partitioning for all our existing databases except for the internal schema registry database which can be routed to any node.

Storage Node

The Storage Nodes are the fundamental blocks for scaling out processing and storage. Each Storage Node hosts a set of partitions. Based on the distribution of partitions, the routers send requests to storage nodes. Some of the storage node functions are:

Query Processing

Store and serve primary data as Avro serialized documents.

Host metadata information about each document including checksum, last modified timestamp, schema version, internal flags etc.

Storage Engine - The Storage nodes are designed to have a pluggable storage engine. All current production deployments use MySQL as the storage engine.

The MySQL instances use the InnoDB storage engine.

Along with Storage, MySQL binary logs are used to provide replication within a cluster and also provide a feed for the change capture stream (databus) to consume.

Secondary Indexes

Maintain secondary indexes as defined within the document schema. These indexes are updated synchronously with any update to the base data. This ensures that the indexes are always in sync and never return stale copies of the data for query-after-write use cases.

The Indexes can be stored with different granularity options i.e., per-partition, per-collection etc..

Handling State Transitions - Helix generates transitions whenever there is a reassignment of partitions. This can occur for a variety of reasons e.g, master failure, scheduled maintenance. This is discussed in more detail in the fault tolerance section.

Local Transactional Support - The Storage nodes are responsible for enforcing transactionality. This is the reason we do not allow cross-collection transactions since they may reside in different partitions which in turn may reside on different storage nodes.

Replication Commit Log - The Storage Nodes are also responsible for providing an ordered commit log of all transactions which can be consumed by databus and by slave copies of partitions within a cluster.

Utility functions like consistency checking and data validation.

Scheduled Backups - Take periodic backups of data being hosted locally.

Cluster management in Espresso is done using Apache Helix . Given a state model definition along with some constraints, Helix computes an "IdealState" which is an ideal distribution of database partititons within a cluster. Helix compares the generated IdealState with the current state of the cluster, called the "ExternalView", and issues the state transitions required to move from the current ExternalView to the IdealState. Since every storage node is registered with Helix, the system is able to detect and fix failures rapidly.

The Espresso state model has the following states with these constraints for each partition :

Every partition must have only 1 master. Every partition can have up to 'n' configurable slaves.

Partitions are distributed evenly across all storage nodes.

No replicas of the same partition may be present on the same node.

Upon master failover, one of the slaves must be promoted to master.



Espresso uses Databus as the change capture mechanism to transport source transactions in commit order. Databus is able to achieve high throughput and low replication latencies. It consumes transaction logs from Espresso to provide a change stream. For more information, please read the Databus blog post.



Databus is used for several purposes by Espresso:

Deliver events to downstream consumers i.e., search indexes, caches etc..

Espresso multi datacenter replication - each locally originated write is forwarded on to remote data centers.

Data replicator is a service that forwards commits between geo-replicated Espresso clusters. It is basically a Databus consumer that consumes events for each database partition within a cluster. This service performs per-partition batching of events in order to improve throughput across high latency links between datacenters.

The Data Replicator service contains a clustered set of stateless instances managed by Helix. The Databus consumers periodically checkpoint their replication progress in ZooKeeper. The checkpoints survive node failures, service restarts etc.

The service itself is fault tolerant. Each node is responsible for replicating a certain set of partitions assigned to it by Helix. Upon node failure, the partitions assigned to the failed node are redistributed evenly among the remaining nodes. When a node starts processing a new partition assignment, it starts replaying transactions from the most recent checkpoint the failed node stored in ZooKeeper. In Helix terms, the service has an Online-Offline Helix state model.

For a majority of Espresso users, automatic ETL to HDFS is a strong requirement. As a result, we need a mechanism to publish all the Espresso data to HDFS with minimal impact to the serving cluster. Any solution also has to integrate well with LinkedIn's existing data pipelines. The snapshot service has been written to address these requirements.

Espresso takes periodic backups of it's data which get written to shared storage accessible to all Espresso nodes within a data center. Metadata information about recent backups is also written to ZooKeeper. The snapshot service sets a watch on the znodes where the metadata is published. Upon notification of a new backup, the snapshot service restores the backup locally and produces separate Avro files corresponding to each table in a database. This data is then available for a "puller" service to load into HDFS. This also has the added benefit of verifying our backup images as soon as they are generated.

In addition to HDFS ETL, there are several other derived views of the data produced by the snapshot service. An example is the Databus Bootstrap file. Databus only retains a fixed size buffer of events in memory. Any consumer that is new or has been down for an extended period of time will not be able to consume from databus since it will have missed events. The Avro files produced by this service can be used to bootstrap a databus consumer i.e., to provide it with an accurate view of all the data since any time 'X'.

The snapshot service itself is a distributed system with the same Online-Oflline state model as the data replicator.

Espresso provides an HTTP based RESTful API which is simple and easy-to-use.

HTTP PUT is used to insert data into a table. The following will create a document with the content location "/MailboxDB/Messages/100/1".

PUT /MailboxDB/*/100

-- Part 1 --

Content-Location: MailboxDB/Messages/100/1

<Avro Serialized Document>

-- Part 2 --

Content-Location: MailboxDB/Messages/100/2

<Avro Serialized Document>

If a document already exists at the specified URL, the document will overwritten in its entirety.

PUT can also be used to perform a transactional insert of multiple documents into a collection ("Mailbox" in our example). Each document is represented as a part in a multipart MIME message.

If any document fails to persist, or a conditional failure occurs (described later), the transaction is rolled back.

HTTP POST provides partial-updates to existing documents or updating a collection by inserting documents into the collection and generating an autoincrement trailing key as described below. As with PUT, POST is also transactional.

If the last key part in a table schema is numeric and is annotated with the "autoincrement" attribute, that key part can be omitted when using POST to insert a document into the corresponding collection. Espresso will generate the key part at the time of the POST, and return the URL of the inserted document in a Content-Location response header.

Document deletion is performed with an HTTP DELETE method applied to the document's URL.

HTTP GET is used for retrieval of documents. It can retrieve single documents, entire collections or even documents spanning multiple collections. If multiple documents are returned from a single HTTP GET operation, they are encoded as parts of a multi-part MIME message. In case of multi-gets spanning different collections, the actual documents may reside on different storage nodes.

# Retrieve a specific message

GET /MailboxDB/Messages/100/1

HTTP 200 OK

<Avro Serialized Content>

# Retrieves an entire collection (Mailbox in our example)

GET /MailboxDB/Messages/100

-- Part 1 --

Content-Location: /MailboxDB/Messages/100/1

<Avro Serialized Content>

-- Part 2 --

Content-Location: /MailboxDB/Messages/100/2

<Avro Serialized Content>

-- Part n --

# Collection pagination

GET /MailboxDB/Messages/100?start=5&count=5

# Multi-Get. Fetch only the specified documents

GET /MailboxDB/Messages/100/(1, 2)

# Multi-Get. Fetch multiple collections

GET /MailboxDB/Messages/(100, 101)

# Secondary index queries are also modeled as GET's.

GET /MailboxDB/Messages/100?query=“from:adi@LinkedIn.com”

Espresso also supports HTTP compliant conditional operations based on Last-Modified and ETag. This allows clients to implement read-modify-write operations and retain cached copies of documents.

Espresso uses an internal clock for ordering of events. Timeline ordering is extremely important for inter-cluster replication, databus etc. Each successful mutation (insert, update or delete) is assigned a 64 bit system change number (SCN). This number is monotonically increasing and is maintained separately per partition i.e., every partition maintains its own SCN. An SCN has 2 parts, a generation (higher 32 bits) and a sequence (lower 32 bits). Every committed transaction increments the sequence by 1. Every mastership transition increments the generation by 1 and resets the sequence. Events committed as part of the same transaction will share the same SCN. The SCN is currently generated by MySQL when the transaction is committed.

Consider this example. Let N1 and N2 be 2 nodes hosting partition 0 (P0) as MASTER and SLAVE respectively. To begin with, the SCN for P0 is (1, 1) i.e., generation 1 and sequence 1. If node N1 receives 3 writes, the SCN will be (1, 4). At this point, assume that N1 undergoes a failure and N2 assumes mastership. N2 sees that the current SCN is (1, 4) and changes the SCN to (2, 1). This process is repeated whenever mastership for a partition is reassigned from one node to another. SCNs must be managed per partition rather than per MySQL instance because partitions are not necessarily tied to a specific MySQL instance. During an expansion, partitions will move between nodes.

Espresso Schemas are stored in ZooKeeper. This component is shared among all services in Espresso. The schemata is modeled as an Espresso database. Here's how we fetch the db, table and document schemas for our example MailboxDB:

The trailing key (1) is the schema version. Schemas are inserted and deleted the same way as any other database. A PUT on a schema causes it to get persisted to ZooKeeper and propagated to all the Storage nodes. Deleting a table and database schema removes it from ZooKeeper and causes all the storage nodes to drop the data corresponding to it. Deleting schemas can be explicitly disabled via configuration. LinkedIn disables schema delete operations in all production environments as a precaution against inadvertent removal of data sets used to service member requests.

Espresso Document schemas can be evolved according to Avro schema evolution rules, along with some Espresso specific restrictions. Backward incompatible schema changes are permitted but discouraged.

We want to add a "senderName" field to the schema. All new records will be serialized using the latest schema version unless specified otherwise per-request.

Since the new "senderName" field is optional, existing documents encoded with version 1 of the schema can be promoted to version 2.

http://www.slideshare.net/amywtang/li-espresso-sigmodtalk
Espresso: Key Design Points
Source-of-truth
–Master-Slave, Timeline consistent
–Query-after-write
–Backup/Restore
–High Availability
Horizontally Scalable
Rich functionality
–Hierarchical data model
–Document oriented
–Transactions within a hierarchy
–Secondary Indexes

Agility – no “pause the world” operations
–“On the fly” Schema Evolution
–Elasticity
Integration with the data ecosystem
–Change stream with freshness in O(seconds)
–ETL to Hadoop
–Bulk import
Modular and Pluggable
–Off-the-shelf: MySQL, Lucene, Avro

Richer than a plain key-value store
Hierarchical keys
Values are rich documents and may contain nested types

•Secondary Index query
–GET /MailboxDB/MessageMeta/bob/?query=“+isUnread:true +isInbox:true”&start=0&count=15
•Partial updates
POST /MailboxDB/MessageMeta/bob/1
Content-Type: application/json
Content-Length: 21
{“unread” : “false”}
•Conditional operations
–Get a message, only if recently updated
GET /MailboxDB/MessageMeta/bob/1
If-Modifed-Since: Wed, 31 Oct 2012 02:54:12 GMT

Transactional writes within a hierarchy
Cluster Expansion
Initial State with 3 Storage Nodes. Step1: Compute new Ideal state
Step 2: Bootstrap new node’s partitions by restoring from backups
Step 3: Catch up from live replication stream
Step 4: Migrate masters and slaves to rebalance

Node Failover
•Step 1: Detect Node failure
•Step 2: Compute new ideal state for promoting slaves to master

Espresso Secondary Indexing
•Local Secondary Index Requirements
•Read after write
•Consistent with primary data under failure
•Rich query support: match, prefix, range, text search
•Cost-to-serve proportional to working set
•Pluggable Index Implementations
•MySQL B-Tree
•Inverted index using Apache Lucene with MySQL backing store
•Inverted index using Prefix Index
•Fastbit based bitmap index

Lucene based implementation
•Requires entire index to be memory-resident to support low latency query response times
•For the Mailbox application, we have two options

Optimizations for Lucene based implementation
•Concurrent transactions on the same Lucene index leads to inconsistency
•Need to acquire a lock
•Opening an index repeatedly is expensive
•Group commit to amortize index opening cost
High value users of the site accumulate large mailboxes
–Query performance degrades with a large index
Performance shouldn’t get worse with more usage!
Time Partitioned Indexes: Partition index into buckets based on created time

Unified Social Content Platform –social activity aggregation
High Read:Write ratio
InMail - Allows members to communicate with each other

Durability and Consistency
Within a Data Center
–Write latency vs Durability
Asynchronous replication
–May lead to data loss
–Tooling can mitigate some of this
Semi-synchronous replication
–Wait for at least one relay to acknowledge
–During failover, slaves wait for catchup
Consistency over availability
Helix selects slave with least replication lag to take over mastership
Failover time is ~300ms in practice

Across data centers
–Asynchronous replication
–Stale reads possible
–Active-active: Conflict resolution via last-writer-wins

Key Takeaways
Espresso is a timeline consistent, document-oriented distributed database
Feature rich: Secondary indexing, transactions over related documents, seamless integration with the data ecosystem
In production since June 2012 serving several key use-cases
http://www.slideshare.net/amywtang/espresso-20952131

Show more