Requirements
Design a chat app like WhatsApp.
Functional Requirements
- A user can send and receive text messages in real time.
- When the user goes online, she receives all the messages that were addressed to her during her offline time.
- By default, once a message is delivered to a user, it will be stored on the user's device and we no longer need to store them on the server (like WhatsApp and WeChat). However, users have the option to modify their history storage policy. They can choose to have their messages stored on the server for up to 90 days.
- The users shouldn't see duplicate messages.
Non-Functional Requirements
- 100M DAU
- 20 average daily messages per user
- The maximum length of a message is 1000 English characters.
- Store the undelivered chats for 1 year.
- The system provides high availability.
- Data must be persistent when we respond that the write is successful.
Resource Estimation
Assume each user sends 20 messages per day, assume each message, with metadata, is roughly 200 bytes.
-
Send/Receive QPS
-
100M * 20 / (3600 * 24 ) ~= 23K
-
To plan for the peak hour, we should aim for 230K RPS (Assuming ten times the average value).
-
-
Storage
-
The requirement to store for 1 year is not for all data, but only for data that has not been delivered to the recipient. For a real-time chat app, most messages will be received by the recipient's client shortly after they are sent. Additionally, users have the option to store chat records for 90 days, but not all users will choose this option. Taking these factors into account, we can assume that the average storage duration for messages is 4 months. Therefore, the storage space only needs to accommodate the amount of data for 4 months.
-
100M * 20 * 200 * 31 * 4 ~= 50 TB
-
-
machines
- Modern web servers can handle a large number of WebSocket connections. WhatsApp's fine-tuned Linux machine could already handle up to 2 million simultaneous WebSocket connections in 2012. Let's assume 10% of the 100MM users will be online at the same time, and 1 machine can handle 1MM connections. We will need 10 machines to handle all the WebSocket connections.
API Endpoint Design
Messages will be sent using WebSocket because its dual-direction nature enables real-time communication efficiently. HTTP Long polling is an alternative.
In a WebSocket connection, once the initial connection is established, both the client and the server can send messages to each other independently without the need for a specific request from one side and a corresponding response from the other.
Let's call our service "Chat Service" and this is how the messages look like:
- Client A sends a message to the Chat Service
{
"action": "send_message";
"receiver_id": string;
"content": string; // Length <= 1000 English char.
"message_id_by_client": number; // Incremental ID generated on the client side, used to identify messages in the server's confirmation message.
"timestamp": number;
}
- Chat Service confirms it received Client A's message
{
"action": "message_received";
"message_id": string; // Globally unique id generated by the Chat Sever
"message_id_by_client"; // ID from the send_message message used to match messages on the client side
"timestamp": number;
}
- Chat Service sends the message to Client B
{
"type": "incoming_message";
"message_id": string;
"timestamp": number;
}
- Client B sends a message to the Chat Service to confirm the message has been successfully delivered
{
"type": "message_delivered",
"message_id": string;
"timestamp": number;
}
- Chat Service sends a message to Client A notifying it that Client B has received the message
{
"type": "message_delivered",
"message_id": string;
"timestamp": number;
}
About Message ID
In the initial step, the client generates a temporary auto-incremented ID, which is then sent to the Chat Service. This ID serves as a provisional identifier. In response, the Chat Service creates a globally unique ID and relays it back to the client, accompanied by the initial temporary ID. This approach is adopted because, even when employing seemingly unique IDs like UUIDs, there's a residual risk of duplication. Moreover, there's a potential threat from malicious actors who might attempt to forge messages by generating duplicate IDs. By initially setting a client-side ID and subsequently replacing it with a server-generated one, we mitigate these risks.
Refer to Url Shortener section on how to generate globally unique IDs.
Message storage settings
This is to fulfill the requirement that users can optionally to modify their history storage policy to have their messages stored on the server for up to 90 days.
-
PUT /user/settings
:Request Body
{ "history_storage": 90 | 0, // Set to 90 for 90 days storage, 0 for no storage }
Response Body
{ "status": "success" | "failed" }
-
GET /history/{chat_participant_id}?last_timestamp={last_timestamp}
:The path variable
chat_participant_id
is used to specify the chat participant, which can be a user_id or a group_id.The query variable
last_timestamp
is used to pull the last 20 chat records from the time specified bylast_timestamp
. Iflast_timestamp
is not specified, the current time is used.Response Body
{ "status": "success" | "failed", "data": [ { "message_id": "message_id", "content": "The content of message", "timestamp": 1692841226851, }, ... ] }
High-Level Design
The design follows our system design template. We make a minor adaptation in the read path: Messages are delivered to the receiving client directly.
Chat Servers:
- Purpose: Primarily responsible for managing real-time communication.
- Function: They maintain WebSocket connections, ensuring that users can send and receive messages in real-time. These servers act as the immediate interface between the user and the backend infrastructure.
Message Queue:
- Purpose: Acts as a mediator for asynchronous communication.
- Function: It serves as a buffer, temporarily holding messages to ensure they are processed in the order they arrive, especially during high traffic or if there are downstream delays. This decouples the sending and processing of messages, providing system resilience.
Database (DB):
- Purpose: Long-term storage solution.
- Function: The DB is responsible for storing messages persistently. Once a message is in the database, it's safely recorded and can be retrieved, even if the system restarts or encounters failures.
Cache:
- Purpose: Facilitate faster data access.
- Function: The cache stores unsent messages temporarily. This ensures quicker retrieval and re-sending, especially if a user doesn't receive a message initially. Using a cache can significantly reduce the load on the main database and improve message delivery speed.
ID Generator:
- Purpose: Provide unique identifiers.
- Function: The ID generator is tasked with producing unique message IDs. By ensuring each message has a distinct ID, the system can track, retrieve, and manage messages more effectively without confusion or overlap. This component is crucial for data integrity and efficient message handling.
Client A Sends a Message to the Chat Service
1. After Client A starts, it sends a WebSocket request to our Chat Service;
2. After the Load Balancer receives the request, it forwards the request to a specific Chat Service, and Client A establishes a connection with this Chat Server. A Chat Server maintains one WebSocket connection for each client.
3. Client A sends messages (send_message
action) through the WebSocket connection.
4. The Chat Server asks the ID generator for a unique id for the message;
5. The Chat Server writes the received messages into the Message Queue;
6. Once the messages are successfully written into the Message Queue, the Chat Server immediately responds to the Client with a message_received
type confirmation message through the WebSocket connection;
7. Upon receiving a confirmation message of the message_received
type from the server, the client can identify the specific message acknowledged by the server using the included message_id
.
Delivering the Message to Client B (Online)
1. The messages stored in the Message Queue are processed by the MQ Consumer;
2. 3. The MQ Consumer checks if the Receiver has a WebSocket connection; if it exists, it then pushes the message (packaged into incoming_message
type) to the receiver (Client B) through the WebSocket connection.
4. 5. Upon receiving an incoming_message
type message, the Client should immediately respond to the Server with a message_delivered
type message.
6. If the Server does not receive a message_delivered
within a certain timeframe (such as 10 seconds), it writes the message into the Message Database without adding the undelivered
field. If either the WebSocket connection does not exist or message_delivered
is not received, the undelivered
field is marked as true when writing into the Message Database.
7. 8. For situations where the WebSocket connection does not exist or message_delivered
is not received, the message should also be recorded in the cache "inbox", and the message will be delivered to the Client using a third-party service.
Delivering the Message to Client B (Offline)
1. 2 Client B comes online and establishes a WebSocket connection with the server.
3. After a secure WebSocket connection is established, the Chat Server checks Client B's "inbox" in the cache for any undelivered messages.
4. If undelivered messages are found, the Chat Service sends these messages to Client B over the WebSocket connection.
5. Upon receiving the messages, Client B sends a message_delivered notification back to the server, confirming successful message reception.
6. After the server receives the message_delivered notification, it updates the status in the Message Database from undelivered to delivered and also removes the corresponding messages from the Inbox in the cache.
Chat History
User Settings Storage Policy
- s1 s2 s3 The user's client modifies the user's history record storage policy through the
PUT /user/settings
interface.
Reading Chat History
-
r1 r2 The client requests to pull the chat history records from the server through the
GET /history/{chat_participant_id}?last_timestamp={last_timestamp}
interface; -
r3 The Message Service queries the User Service for the user's history record storage policy and gets the time range that should be terminated;
-
r4 The Message Service pulls chat records from the Message Database.
Detailed Design
Data Store
Database Type
We can use a hybrid model of SQL and NoSQL: RDBMS (MySQL, Postgres etc) for user profiles and relationships while using NoSQL (MongoDB, DynamoDB, Cassandra etc) for chat message storage for the anticipated high write loads and Redis for caching.
Data Schema
The data schema for the chat application could be as follows:
-
messages
collection:{ "message_id": "the-id-of-message", "sender_id": "the-sender-user-id", "receiver_id": "the-receiver-user-id", "content": "The content of message", "undelivered": true | none, "timestamp": 1692677132882, // The time when the message is sent is the time when it is written into the Message Queue. }
Database Partitioning
To manage the large volume of data and high traffic, we can employ horizontal partitioning (sharding). We can partition by either user_id
, message_id
, or timestamp
.
By user_id
:
- Pros: Predictable and uniform distribution of data since each shard handles a subset of users, ensuring all related data for a user is localized.
- Cons: Inefficiencies might arise if certain users are way more active, leading to hotspots.
By message_id
:
-
Pros: Even Distribution. If the Message IDs are generated randomly or through a hash function that ensures good distribution, then sharding by Message ID should distribute data evenly across shards.
-
Cons: Query Complexity. Fetching all messages for a particular user or between two users could become complex and slow, as it would require querying multiple shards.
By timestamp
:
- Pros: Efficient for chronological message retrieval.
- Cons: Potential for uneven distribution during peak chat hours.
Partitioning by user_id
would make more sense in this case. This approach ensures that all chat messages for a particular user, regardless of their chat partner, are stored on the same shard. It simplifies data retrieval and minimizes cross-shard operations.
For more detailed content, refer to Partitioning (Sharding).
Database Replication
To ensure high availability and data durability, we can use database replication. Each write operation is performed on the primary database and then replicated to one or more secondary databases. If the primary database fails, one of the secondary databases can take over.
Many databases supports replication natively through a feature called replica sets. We will use MongoDB as an example here. A replica set is a group of MongoDB processes that maintain the same data set. The primary node receives all write operations, and all other nodes are secondary nodes that replicate the primary node's data set.
If the primary node fails, an election determines the new primary node. This replication process ensures that your data is safe and that your system remains available even if a node fails. It also allows you to scale out your system by adding more secondary nodes to handle read operations.
By default, MongoDB reads and writes are on the primary node. We can set the read preference of the MongoDB client to "secondary", so that the read operations of the MongoDB client will be executed on the secondary node. The advantage is that it can reduce the read pressure on the read node. The disadvantage is that it may read outdated data, because the secondary node may not have time to replicate the latest data on the primary node. However, the Realtime Chat app does not require strong consistency, as long as eventual consistency can be ensured.
For more information about replication, you can refer to the article Replication.
Data Retention and Cleanup
Use Scheduled Jobs to regularly clean up data.
- Query the Message Database for undelivered messages stored for more than one year and clean up the corresponding data in the Cache's Inbox.
- Query the user's storage policy and clean up the delivered messages in the Message Database according to the storage policy.
Cache
Maintain an Inbox for each user in the Cache so that when the Message Service needs to query whether there are any undelivered messages, it only needs to query from the Cache. This will reduce the load on the database and improve the performance of the chat application. We can use a cache like Redis, which provides high performance and supports various data structures.
The following uses Redis as an example for illustration.
Use the ZSET type to store messages. Use the receiver's user id of the message as the key, timestamp as the score, and the message converted to a JSON string as the member.
For example, the user id of the receiver is: 26efa168-5a5e-4793-b5fb-dcc0fe92decd, and the command to save three messages to the cache is as follows:
ZADD "26efa168-5a5e-4793-b5fb-dcc0fe92decd" 1692675132882 "{\"message_id\":\"472731d7-0e6a-40ff-9e91-e422ede43fdd\",\"sender_id\":\"ad224914-7f64-44c5-a358-a2eddde13d64\",\"content\":\"The content of message 0\",\"timestamp\":1692675132882}" ZADD "26efa168-5a5e-4793-b5fb-dcc0fe92decd" 1692676631232 "{\"message_id \":\"472731d7-0e6a-40ff-9e91-e422ede43fdd\",\"sender_id\":\"ad224914-7f64-44c5-a358-a2eddde13d64\",\"content\":\"The content of message 1\",\"timestamp\":1692676631232}" ZADD "26efa168-5a5e-4793-b5fb-dcc0fe92decd" 1692677299157 "{\"message_id\":\"472731d7-0e6a-40ff-9e91-e422ede43fdd\",\"sender_id\":\"ad224914-7f64-44c5-a358-a2eddde13d64\",\"content\":\"The content of message 2\",\"timestamp\":1692677299157}"
When pushing, it needs to be pushed in chronological order. The score is the timestamp, so the smallest score is the earliest in time. Use ZPOPMIN
to get and remove the member from ZSET.
ZPOPMIN "26efa168-5a5e-4793-b5fb-dcc0fe92decd" # The smallest return is the first one above: 1) "{\"message_id\":\"472731d7-0e6a-40ff-9e91-e422ede43fdd\",\"sender_id\":\"ad224914-7f64-44c5-a358-a2eddde13d64\",\"content\":\"The content of message 0\",\"timestamp\":1692675132882}" 2) 1692675132882
Analytics
For analytics, we can use a combination of stream processing and batch processing systems.
Stream processing is used for real-time analytics. It allows us to analyze and process data in real-time as it arrives. This can be crucial for monitoring the system's performance and user activity as it happens, enabling immediate identification and resolution of any issues or anomalies. Tools like Apache Kafka and Apache Storm are often used for stream processing.
On the other hand, batch processing is used for processing large volumes of data collected over a period of time. Tools like Apache Hadoop and Apache Spark are commonly used for this purpose. Batch processing can provide valuable insights on user behavior, message patterns, peak usage times, etc., which can be used for strategic decision making, improving system performance, and enhancing user experience.
Data visualization tools like Tableau or PowerBI can be used to present the data in a clear and intuitive manner, making it easier to identify trends and patterns.
Additionally, machine learning algorithms can be applied to the data to predict user behavior and system load. This proactive approach can lead to more informed decision making and increased system efficiency.
Follow up detailed design questions and answers
How to handle the situation when a user sends a message to a user who is currently offline?
When a user sends a message to another user who is currently offline, the message is first stored in the Message Queue. The MQ Consumer will process the messages, call the Message Service to store the message in the Message Database and maintain the cache "inbox". When the user comes online, the Message Service will check the "inbox" in the cache again for any undelivered messages and deliver them to the user.
How to ensure that the user does not see duplicate messages?
To prevent users from seeing duplicate messages, both client-side and server-side measures can be employed. On the client-side, each message can be tagged with a temporary, unique ID before being sent. When the server receives the message, it generates a globally unique ID, which is sent back to the client for verification. This process makes the request idempotent, allowing the server to identify and ignore repeated requests. On the server-side, message statuses such as 'sent,' 'delivered,' and 'read' are tracked in the database, which only allows unique message IDs. Any message queue used can also have a de-duplication mechanism based on the unique IDs. By ensuring atomic operations in the database and implementing timeouts and retries, the system can effectively prevent message duplication.
How to handle the situation when the server fails?
Handling server failures is a critical aspect of building a resilient chat system. Employing a combination of redundancy, load balancing, and data recovery strategies can mitigate the impact. Redundant chat servers can be set up to take over if the primary server fails, ensuring uninterrupted service. Load balancers can automatically reroute traffic to healthy servers. On the data side, using a distributed database and message queue ensures that no single point of failure exists. Messages that are not yet delivered can be stored in a distributed cache or a durable message queue, from where they can be retrieved once the failed server is back online or a backup server takes over. Monitoring and alert systems should be in place to detect and notify of server failures, enabling quick manual intervention when necessary. Through a well-designed architecture that prioritizes fault tolerance and data durability, the system can continue to operate effectively even when individual components fail.
How to handle the situation when a user sends a large number of messages in a short period of time?
In such a scenario, rate limiting can be implemented to prevent any form of abuse or denial-of-service attacks. This can be done at the application level where we limit the number of messages a user can send per unit time. If a user exceeds this limit, their messages could be delayed or dropped, or the user could be temporarily blocked.
The specific implementation of creating a WebSocket connection and the rate limit after establishing a connection need to be considered separately.
The rate limit for creating a WebSocket connection is used to prevent malicious clients from frequently creating and discarding connections. The WebSocket handshake process follows the HTTP protocol, so a more general rate limit service can be used, such as CloudFlare's WAF.
Rate limiting for communication after establishing a WebSocket connection can be a bit tricky since WebSockets are designed for real-time communication and don't follow the request/response model like HTTP. We can implement our own Rate Limiter Service and call it in the Chat Service.
How to ensure the privacy and security of the messages?
To ensure message privacy and security, all communication between client and server can be encrypted using protocols like TLS. Additionally, messages can be encrypted at the application level before being stored in the database. Access to messages in the database can be controlled using proper authentication and authorization mechanisms.
How to support group chat?
For group chats with a limited number of members (such as limiting members to not exceed 500, 2000)
To support group chat, we need to make some modifications to our current design:
-
Data Schema Modification: We need to introduce a new collection called
groups
in our database. Each document in this collection will represent a group and will contain information like group_id, group_name, and members (an array of user_ids).{ "_id": ObjectId("..."), // Used as group_id "group_name": "the-name-of-the-group", "members": ["user_id_1", "user_id_2", "user_id_3", ...], }
We also need to modify our
messages
collection to include a new field calledgroup_id
. If a message is sent to a group, thegroup_id
field will contain the id of the group, and thereceiver_id
field will be null. If a message is sent to an individual user, thereceiver_id
field will contain the id of the receiver, and thegroup_id
field will be null.{ "_id": ObjectId("..."), // Used as message_id "sender_id": "the-sender-user-id", "receiver_id": "the-receiver-user-id", // null if the message is sent to a group "group_id": "the-group-id", // null if the message is sent to an individual user "content": "The content of message", "timestamp": 1692677132882, // The time when the message is sent is the time when it is written into the Message Queue. }
-
Message Delivery Modification: When a message is sent to a group, the Message Deliver will find all the WebSocket connections of the members of the group and push the message to all of them. If a member is offline, the message will be stored in the Message Database and the Inbox of the Cache, and will be delivered when the member comes online.
-
API Endpoint Modification: We need to modify our API to support group-related operations like creating a group, adding/removing members to/from a group, sending a message to a group, etc.
// Sending a message to a group { "type": "send_message", "group_id": "the-id-of-the-group", "content": "The content of message", // Length <= 1000 English char. "confirm_id": "confirm_id" // Generated at the client side, used to identify messages in the server's confirmation message. }
// Receiving a message from a group { "type": "incoming_message", "message_id": "the-id-of-message", "sender_id": "the-id-of-the-message-sender", "group_id": "the-id-of-the-group", "content": "The content of message", "timestamp": 1692435870143 // Timestamp with millisecond precision. }
With these modifications, our chat application can support group chat.
For group chats with no limit on the number of group members
For situations where there are a lot of group member data (tens of thousands, hundreds of thousands or even more), if we still adopt the method of pushing each message to each group member, it will inevitably put a lot of pressure on the server. For such situations, we can use "pull" instead of push (more on push vs pull). Define a pull interface, and the client pulls group messages regularly.
GET /chat/{group_id}?timestamp={timestamp}
, returns the latest 20 group messages from the timestamp:
The path variable group_id
is used to specify the group id.
Query parameters, timestamp
specifies the end time. When timestamp is missing, it defaults to the current time and returns the latest 20 group messages.
Response Body:
{
"status": "success" | "failed",
"data": [
{
"message_id": "message_id",
"sender_id": "the-user-id-of-sender",
"content": "The content of the message",
"timestamp": 1692841226851,
},
...
]
}
Use the GET /chat/{group_id}
interface to pull group messages.
Store the recent messages of the group (or a specific number) in the cache. When handling requests to retrieve group messages, the Message Service fetches them from the cache. If the request exceeds the cache's range, the service retrieves the messages from the database.
How to support displaying the online/offline status of users?
To support displaying the online/offline status of users, we can maintain a connection state for each user in our system. When a user establishes a WebSocket connection with the server, a "heartbeat" will be triggered periodically (for example, every 5 to 30 seconds).
We can store the connection state in a cache like Redis for fast access. The key can be the user_id
, and the value can be the timestamp (the time of the most recent "heartbeat"). Each time a "heartbeat" is triggered, this timestamp is updated.
To display the online/offline status of a user, we can provide an API endpoint that takes a user_id as input and returns the connection state of the user. The server can query the cache to get the time of the last "heartbeat".
Here is an example of how the API endpoint could look like:
-
GET /user/{user_id}/status
:The path variable
user_id
is used to specify the user.Response Body
{ "status": "success" | "failed", "data": { "last_active_time": 1692841226851, // Timestamp with millisecond precision. "connection_state": "online" | "online 5 minutes ago" | "offline", // The text here is generated based on last_active_time } } action