The System Design Courses

Go beyond memorizing solutions to specific problems. Learn the core concepts, patterns and templates to solve any problem.

Start Learning

Design Realtime Monitoring System

Functional Requirements

  1. The system should continuously monitor data from a fleet of servers.
  2. The data to be collected includes CPU usage, memory usage, disk usage, system logs, and web server access and error logs.
  3. Users should be able to set alert rules based on certain conditions. For example, if the average CPU usage in the last 5 minutes exceeds 80%, an alert should be triggered.
  4. Alerts should be delivered to users via email, SMS, or push notifications within 1 minute of being triggered.
  5. Users should be able to view real-time and historical data on a web-based dashboard.

Non-Functional Requirements

  1. Scalability: The system should initially support monitoring 10,000 servers, with the capacity to scale by 20% annually.
  2. Reliability: The system should accurately collect and process server data, and reliably deliver alerts.
  3. Availability: The system should be operational 24/7 to ensure continuous monitoring and alerting.
  4. Consistency: The system should provide consistent data across all servers and over time.
  5. Latency: The system should process and display data in real-time or near real-time.
  6. Efficiency: The system should efficiently handle high volumes of data and alert rules.

Resource Estimation

Assuming each metric is submitted every 10 seconds (submitted 8640 times per day), and each server has 6 metrics.

Assuming the read-write ratio is 1:100.

Assuming each metric is 100 bytes.

  • Writes Per Second: 10,000 servers / 10s * 6 = 6,000 RPS

    The number 6 here represents 1 time for CPU, 1 time for memory, 1 time for disk usage, 1 time for system log, 1 time for web server access log, and 1 time for web server error log.

  • Reads Per Second: 6,000 RPS / 100 = 60 RPS

  • Storage: 10K DAU * 51840(Write operations per user per day) * 100Bytes * 31(days of month) * 60(months of one year) ~= 88TB.

Use the resource estimator to calculate.

API Endpoint Design

We will install an agent on each server, which can collect data and send it back to a central server. The main API for the main server to accept new data entries:

  • POST /api/data: The request body includes the server ID, timestamp, and data, as below:
    { "server_id": String, "timestamp": Timestamp, "data": { // data } }
    The response includes the status of the data entry, as below:
    { "status": String }
    Other components like dashboard, alerts and rules engine also need their APIs but they are mostly cookie-cutter CRUD APIs tailored to the frontend implementations. Therefore we won’t get into them here.

High-Level Design

The system includes server agents, a data ingestion system (Kafka/RabbitMQ), a stream processing system (Flink/Storm), a time-series database (InfluxDB/TimescaleDB), an alert notification system, and a user dashboard.

Design Realtime Monitoring System

The server agents collect data and send it to the data ingestion system, which distributes the data to the stream processing system. The stream processing system processes the data in real-time, triggers alerts based on user-defined rules, and stores the processed data in the time-series database. The alert notification system delivers alerts to users, and the dashboard displays real-time and historical data.

Detailed Design

Components in the System

  • Servers with Agents

    The Monitoring Agent should be installed on servers that require monitoring to gather key metrics such as CPU usage, memory usage, system logs, and web server logs. This agent is designed to be lightweight, ensuring minimal impact on the system's performance. It's crucial for these agents to be reliable, resilient (capable of handling temporary network disruptions, for instance), and secure.

    In terms of system hardware metrics like CPU usage and memory usage, we can gather this data at regular intervals, for example, every 10 seconds. This data is then sent to the Realtime Monitoring System service. It's important to remember that the data collection interval shouldn't be too short. This is because the data collection process itself requires resources, including CPU cycles. If data collection occurs too frequently, it could significantly consume these resources, thus impacting the performance of the server being monitored.

    When it comes to system logs and web server logs, we can keep track of their log files. Whenever these files are modified, the updated content is sent to the Realtime Monitoring System service.

    The Monitoring Agent only transmits data to the Realtime Monitoring System service, it doesn't directly write data into the Message Queue. In the "High-Level Design" section of the design diagram, the relationship between the Monitoring Agent and the Message Queue is further detailed as depicted in the subsequent diagram:

    Alt text

    Push or Pull

    The preceding segment explains the process of transmitting metrics data from the agent to the server, with the explanation based on the design principle of the push method. However, an alternative method, known as the pull method, could also be utilized. In the pull method, the Message Queue Proxy proactively retrieves data from the agent. Below is a comparative illustration:

    Push VS Pull

    Let's compare the advantages and disadvantages of push and pull:

    Push SystemPull System
    Advantages1. Real-time data delivery. 2. Efficient for large, infrequent data.1. On-demand data transfer. 2. Receiver has control.
    Disadvantages1. Potential for data overload. 2. Less flexibility for receiver.1. Not real-time. 2. Can be inefficient for large, infrequent data.

    For a more comprehensive understanding, please refer to Push vs Pull.

    Considering the requirement for instantaneous data and the high concurrency support provided by our Data Ingestion System, the push method could potentially be the more effective design for this system.

  • Data Ingestion System (Kafka/RabbitMQ)

    The role of the data ingestion system is to ingest the massive amount of data coming in from thousands of servers and distribute that data to multiple consumers.

    Apache Kafka or RabbitMQ are great tools for this. They are distributed streaming platforms that allow you to handle real-time data feeds with low-latency and high-throughput. They also provide capabilities to store and process streams of records in a fault-tolerant way.

    A possible confusion that may arise here is why not directly write data into a TSDB. The first thing to consider is whether a TSDB can meet our writes per second needs. Taking InfluxDB as an example, below are the write per second and query per second data for the database cluster given by the official InfluxDB document:

    Alt text

    And the recommended hardware for various loads:

    CPU(cores)RAM(GB)IOPS
    Low load22-41000
    Moderate load4-68-321000+
    High load8+32+1000+

    From the above, utilizing the InfluxDB cluster, the Fields write per second typically ranges from 5k-100k. It's crucial to understand that this 100k limit is per field in InfluxDB. Our earlier estimate of 6k in the 'Resource Estimation' section was based on requests per second (rps), with each write request involving multiple fields. When calculated by fields, this number could potentially multiply, resulting in tens or even hundreds of write requests per second.

    Traffic won't always be evenly distributed; there will be periods of high and low activity. Peak loads, which could be several times the average, are likely to surpass the 100k limit of InfluxDB. Given the database's high server performance requirements, it's costly and inefficient to manage peak loads by simply enhancing a single database server's performance or by expanding the database cluster.

    As a result, opting to write to Kafka/RabbitMQ instead of directly to the database is an effective strategy for managing peak loads and database downtime. This method allows for more efficient load management and ensures the system can handle high traffic volumes without exceeding the InfluxDB cluster's capacity.

    For information on Kafka's Writes Per Second data, refer to this article: Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines).

    To summarize the advantages of introducing a Data Ingestion System (Kafka/RabbitMQ):

    1. Fault tolerance, that is, data will not be lost when the database is down;
    2. Lower cost to handle higher peaks;
    3. Provides the possibility for the next Stream Processing System (Flink/Storm).

    There are also disadvantages:

    1. The complexity of the system is increased, and the maintenance cost is higher;
    2. The consistency of the system is reduced. (However, eventual consistency is completely satisfactory for this system)

    In conclusion, the benefits outweigh the disadvantages in this system.

  • Stream Processing System (Flink/Storm)

    Analyzing data is a time-consuming task that requires significant computational resources. In some instances, exceptions may occur during the execution process. Without a Data Ingestion System like Kafka or RabbitMQ, if these tasks are performed prior to writing into a Time Series Database (TSDB), it could be challenging to ensure the system's write requests per second (rps). This underscores the importance of integrating a Data Ingestion System such as Kafka or RabbitMQ into the system for efficient data management.

    Alt text

    Another conceivable solution is to place data analysis and processing tasks after storage in the TSDB and trigger them through a timer. This approach has two drawbacks:

    1. Loading newly written data from the database wastes system performance and increases the read pressure on the database;
    2. Poor real-time performance. Because if the timer interval is short, queries to the TSDB will be more frequent, increasing the read pressure on the database; if the time interval is long, the real-time performance will be poor.

    The use of Stream Processing does not have these two problems.

    The stream processing system takes the ingested data and processes it in real time. It is used for real-time analytics, complex event processing, and more.

    A crucial part of its role is to analyze the incoming data based on predefined alert rules. These alert rules can be set by users and may include conditions such as CPU usage exceeding a certain percentage, memory usage reaching a critical level, or specific error messages appearing in system logs.

    When the stream processing system identifies data that matches these alert rules, it triggers an alert. This alert is then passed to the alert notification system for delivery to the appropriate users.

    Apache Flink and Apache Storm are frameworks for big data processing and analytics. They allow you to process high-speed data streams in real time and perform event-based processing. In this architecture, this system would be responsible for triggering alerts based on predefined rules.

    Refer to the stream processing section for more details.

  • Time-Series Database (InfluxDB/TimescaleDB)

    The time-series database (TSDB) is used to efficiently store and retrieve time-stamped data. TSDBs like InfluxDB or TimescaleDB are designed to handle high write and query loads, and they provide capabilities like data retention policies, continuous queries, and more.

    The processed data from the stream processing system is stored in the TSDB. This data is then available for queries and can be used to populate the dashboard.

  • Alert Notification System

    When the stream processing system detects a condition that matches an alert rule, it triggers an alert. These alerts are then distributed to users through the alert notification system.

  • Dashboard (User UI)

    The dashboard serves as the user interface for the monitoring system. It retrieves data from the time-series database to display current and historical server metrics. It should provide real-time or near real-time visibility into system metrics.

    The visual interface provided by the Dashboard allows us to define or modify alert rules, and we can also see the sending history of alert notifications.

    Existing Solutions

    There are several existing solutions that can be used for the dashboard.

    Grafana: Grafana is a multi-platform open-source analytics and interactive visualization web application. It provides charts, graphs, and alerts for the web when connected to supported data sources, including InfluxDB and TimescaleDB. It is expandable through a plug-in system and can create, explore, and share dashboards with your team.

    Kibana: Kibana is an open-source data visualization and exploration tool used for log and time-series analytics, application monitoring, and operational intelligence use cases. It offers powerful and easy-to-use features such as histograms, line graphs, pie charts, heat maps, and built-in geospatial support. Also, it provides tight integration with Elasticsearch, a popular analytics and search engine, which makes Kibana the default choice for visualizing data stored in Elasticsearch.

    Chronograf: Chronograf is InfluxData’s open-source web application. It allows you to quickly see the data that you have stored in InfluxDB so you can build robust queries and alerts. It is simple to use and includes templates and libraries to allow you to rapidly build dashboards with real-time visualizations of your data and to easily create alerting and automation rules.

  • Batch Processing System (e.g., Apache Spark)

    A major characteristic of time-series data in monitoring systems is that while the volume of data is large, access to the data is concentrated on the most recent data. Historical data, such as data from a week or a month ago, is rarely accessed. However, the vast amount of data occupies valuable database storage space. Therefore, the preservation of historical data is an issue that monitoring systems must consider. Measures that can be adopted include:

    • Aggregating historical data, transforming data with high time granularity into data with lower granularity, thereby reducing the quantity of historical data;
    • Using storage media with lower costs to store historical data.

    The batch processing system reads data from the TSDB in batches (for example, every hour). It processes and aggregates the data, and then writes the processed data to a long-term storage system. Apache Spark is a popular framework for big data processing and analytics, and it's designed to handle batch processing efficiently.

    For more information, please refer to Batch processing.

  • Long-Term Storage (e.g., Hadoop/HDFS, Amazon S3, Google Cloud Storage)

    This is where your processed, batch data is stored for long-term. It should be scalable and cost-effective, and it should support the types of queries and analyses you need to perform on your data.

    Comparing Storage Costs: InfluxDB Cloud Serverless vs. Amazon S3

    The following is an updated version of the InfluxDB Cloud Serverless Pricing Table as of August 2023:

    InfluxDB Cloud Serverless Storage pricing 2023-08

    The pricing table indicates that the cost for Storage is priced at $0.002 per GB-hour, which approximately equates to $1.44 per GB-month. In comparison, the Amazon S3 Pricing as of the same period (August 2023) stands at roughly $0.023 per GB. The storage costs between these two platforms show a significant difference, with a ratio of about 60:1. Therefore, it's highly recommended to utilize long-term storage for historical data as a cost-saving measure.

Data Store

Database Type

A time-series database (InfluxDB/TimescaleDB) is used to efficiently store and retrieve time-stamped server data.

A relational database is used to store alert rules, notifications, metadata of monitored servers, and some configuration information of the dashboard, etc.

Data Schema

Server Data (Saved to TSDB):

  • time (timestamp): The time the data point was recorded.
  • server_id (string): Identifier of the server from which the data originated.
  • cpu_usage (float): The CPU usage of the server at the recorded time.
  • memory_usage (float): The memory usage of the server at the recorded time.
  • disk_usage (float): The disk usage of the server at the recorded time.
  • system_logs (float): The network traffic on the server at the recorded time.
  • access_logs (text): Any access logs recorded at the time.
  • error_logs (text): Any error logs recorded at the time.

Alerts Table (RDBMS):

  • alert_id (string): Unique identifier for each alert.
  • server_id (string): Identifier of the server associated with the alert.
  • rule_id (string): Identifier of the rule that triggered the alert.
  • time (timestamp): The time the alert was generated.
  • status (string): Status of the alert (e.g., "open", "acknowledged", "closed").

Rules Table (RDBMS):

  • rule_id (string): Unique identifier for each rule.
  • rule_name (string): Name of the rule.
  • rule_condition (string): The condition under which the rule triggers an alert.

Servers Table (RDBMS):

  • server_id (string): Unique identifier for each server.
  • server_name (string): Name of the server.

Database Partitioning

To handle the large amount of data and high write load, we can partition the data in our time-series database. Partitioning can be done based on time or server_id. Time-based partitioning is a common strategy in time-series databases, where data is partitioned into time intervals (e.g., hourly, daily). This makes it easy to drop old data and helps with the performance of time-based queries. Server-based partitioning can help distribute the write load across multiple nodes and can improve query performance if we often query data for specific servers.

Database Replication

Replication is crucial for ensuring data availability and durability. We can use a combination of synchronous and asynchronous replication. Synchronous replication ensures strong consistency but can impact write performance. Asynchronous replication provides better write performance but can lead to data loss in case of a node failure. For our use case, we can use synchronous replication within a data center and asynchronous replication across data centers.

Data Retention and Cleanup

Given the large volume of data, we need to have a data retention policy. We can keep detailed data for a short period (e.g., one week) and then aggregate the data for long-term storage. For example, we can keep hourly averages for up to a month, daily averages for up to a year, and so on. Old data can be deleted or archived to cheaper storage. This can be done as a background job running at off-peak hours to minimize the impact on system performance.

Cache

Caching can be used to improve the performance of read-heavy operations. For example, we can cache server metadata and alert rules since these are likely to be accessed frequently. We can use an in-memory cache like Redis or Memcached for this purpose. The cache should have a suitable eviction policy (e.g., LRU) to ensure that the most frequently accessed data is kept in the cache.

We can also cache the results of frequent and expensive queries. Caching expensive queries can significantly improve the performance of the system by reducing the load on the database and decreasing the response time for these queries.

In the real-time monitoring system, once data for a specific time frame has been collected and processed, the results for that time frame will not change. This is because the metrics being monitored, such as CPU usage, memory usage, and disk usage, are time-series data that are immutable once recorded.

For example, the average CPU usage for the last hour will remain the same once that hour has passed. Therefore, if a user frequently queries for the average CPU usage in the last hour, we can cache the result of this query and serve it directly from the cache until the next hour has passed.

This makes caching particularly effective for this type of system. By storing the results of expensive queries in a cache, we can provide faster access to this data and reduce the computational load on the system.

For more detailed information about caching, you can refer to Caching.

Analytics

Analytics can provide valuable insights into the system's performance and usage patterns. We can use batch processing (e.g., Apache Spark) to analyze the data and generate reports. For example, we can identify patterns in server performance, detect anomalies, and predict future trends. We can also analyze the usage of the monitoring system itself, such as the number of active users, the most common alert rules, etc. The results of these analyses can be used to improve the system and provide better service to the users.

How to strike a balance between data granularity and aggregation?

Grasping the building blocks ("the lego pieces")

This part of the guide will focus on the various components that are often used to construct a system (the building blocks), and the design templates that provide a framework for structuring these blocks.

Core Building blocks

At the bare minimum you should know the core building blocks of system design

  • Scaling stateless services with load balancing
  • Scaling database reads with replication and caching
  • Scaling database writes with partition (aka sharding)
  • Scaling data flow with message queues

System Design Template

With these building blocks, you will be able to apply our template to solve many system design problems. We will dive into the details in the Design Template section. Here’s a sneak peak:

System Design Template

Additional Building Blocks

Additionally, you will want to understand these concepts

  • Processing large amount of data (aka “big data”) with batch and stream processing
    • Particularly useful for solving data-intensive problems such as designing an analytics app
  • Achieving consistency across services using distribution transaction or event sourcing
    • Particularly useful for solving problems that require strict transactions such as designing financial apps
  • Full text search: full-text index
  • Storing data for the long term: data warehousing

On top of these, there are ad hoc knowledge you would want to know tailored to certain problems. For example, geohashing for designing location-based services like Yelp or Uber, operational transform to solve problems like designing Google Doc. You can learn these these on a case-by-case basis. System design interviews are supposed to test your general design skills and not specific knowledge.

Working through problems and building solutions using the building blocks

Finally, we have a series of practical problems for you to work through. You can find the problem in /problems. This hands-on practice will not only help you apply the principles learned but will also enhance your understanding of how to use the building blocks to construct effective solutions. The list of questions grow. We are actively adding more questions to the list.

Read the rest of this article and practice this problem with a FREE account

The System Design Courses

Go beyond memorizing solutions to specific problems. Learn the core concepts, patterns and templates to solve any problem.

Start Learning

System Design Master Template

Comments