Solution

Design for Realtime Monitoring System

Functional Requirements

  1. The system should continuously monitor data from a fleet of servers.
  2. The data to be collected includes CPU usage, memory usage, disk usage, system logs, and web server access and error logs.
  3. Users should be able to set alert rules based on certain conditions. For example, if the average CPU usage in the last 5 minutes exceeds 80%, an alert should be triggered.
  4. Alerts should be delivered to users via email, SMS, or push notifications within 1 minute of being triggered.
  5. Users should be able to view real-time and historical data on a web-based dashboard.

Non-Functional Requirements

  1. Scalability: The system should initially support monitoring 10,000 servers, with the capacity to scale by 20% annually.
  2. Reliability: The system should accurately collect and process server data, and reliably deliver alerts.
  3. Availability: The system should be operational 24/7 to ensure continuous monitoring and alerting.
  4. Consistency: The system should provide consistent data across all servers and over time.
  5. Latency: The system should process and display data in real-time or near real-time.
  6. Efficiency: The system should efficiently handle high volumes of data and alert rules.

Resource Estimation

Assuming each metric is submitted every 10 seconds (submitted 8640 times per day), and each server has 6 metrics.

Assuming the read-write ratio is 1:100.

Assuming each metric is 100 bytes.

  • Writes Per Second: 10,000 servers / 10s * 6 = 6,000 RPS

    The number 6 here represents 1 time for CPU, 1 time for memory, 1 time for disk usage, 1 time for system log, 1 time for web server access log, and 1 time for web server error log.

  • Reads Per Second: 6,000 RPS / 100 = 60 RPS

  • Storage: 10K DAU * 51840(Write operations per user per day) * 100Bytes * 31(days of month) * 60(months of one year) ~= 88TB.

Use the resource estimator to calculate.

API Endpoint Design

We will install an agent on each server, which can collect data and send it back to a central server. The main API for the main server to accept new data entries:

  • POST /api/data: The request body includes the server ID, timestamp, and data, as below:
    { "server_id": String, "timestamp": Timestamp, "data": { // data } }
    The response includes the status of the data entry, as below:
    { "status": String }
    Other components like dashboard, alerts and rules engine also need their APIs but they are mostly cookie-cutter CRUD APIs tailored to the frontend implementations. Therefore we won’t get into them here.

High-Level Design

The system includes server agents, a data ingestion system (Kafka/RabbitMQ), a stream processing system (Flink/Storm), a time-series database (InfluxDB/TimescaleDB), an alert notification system, and a user dashboard.

Design Realtime Monitoring System

The server agents collect data and send it to the data ingestion system, which distributes the data to the stream processing system. The stream processing system processes the data in real-time, triggers alerts based on user-defined rules, and stores the processed data in the time-series database. The alert notification system delivers alerts to users, and the dashboard displays real-time and historical data.

Detailed Design

Components in the System

  • Servers with Agents

    The Monitoring Agent should be installed on the server that requires monitoring in order to gather key metrics such as CPU usage, memory usage, system logs, and web server logs. This agent is designed to be lightweight, ensuring minimal impact on the system's performance. It's crucial for these agents to be reliable, resilient (capable of handling temporary network disruptions, for instance), and secure.

    In terms of system hardware metrics like CPU usage and memory usage, we can gather this data at regular intervals, for example, every 10 seconds. This data is then sent to the Realtime Monitoring System service. It's important to remember that the data collection interval shouldn't be too short. This is because the data collection process itself requires resources, including CPU cycles. If data collection occurs too frequently, it could significantly consume these resources, thus impacting the performance of the server being monitored.

    When it comes to system logs and web server logs, we can keep track of its log files. Whenever these files are modified, the updated content is sent to the Realtime Monitoring System service.

    The Monitoring Agent only transmits data to the Realtime Monitoring System service, it doesn't directly write data into the Message Queue. In the "High-Level Design" section of the design diagram, the relationship between the Monitoring Agent and the Message Queue is further detailed as depicted in the subsequent diagram:

    Alt text

    Push or Pull

    The preceding segment explains the process of transmitting metrics data from the agent to the server, with the explanation based on the design principle of the push method. However, an alternative method, known as the pull method, could also be utilized. In the pull method, the Message Queue Proxy proactively retrieves data from the agent. Below is a comparative illustration:

    Push VS Pull

    Let's compare the advantages and disadvantages of push and pull:

    Push SystemPull System
    Advantages1. Real-time data delivery. 2. Efficient for large, infrequent data.1. On-demand data transfer. 2. Receiver has control.
    Disadvantages1. Potential for data overload. 2. Less flexibility for receiver.1. Not real-time. 2. Can be inefficient for large, infrequent data.

    For a more comprehensive understanding, please refer to Push vs Pull.

    Considering the requirement for instantaneous data and the high concurrency support provided by our Data Ingestion System, the push method could potentially be the more effective design for this system.

  • Data Ingestion System (Kafka/RabbitMQ)

    The role of the data ingestion system is to ingest the massive amount of data coming in from thousands of servers and distribute that data to multiple consumers.

    Apache Kafka or RabbitMQ are great tools for this. They are distributed streaming platforms that allow you to handle real-time data feeds with low-latency and high-throughput. They also provide capabilities to store and process streams of records in a fault-tolerant way.

    A possible confusion that may arise here is why not directly write data into TSDB. The first thing to consider is whether TSDB can meet our write per second needs. Taking InfluxDB as an example, below are the write per second and query per second data for the database cluster given by the official InfluxDB document:

    Alt text

    And the recommended hardware for various loads:

    CPU(cores)RAM(GB)IOPS
    Low load22-41000
    Moderate load4-68-321000+
    High load8+32+1000+

    From the above, utilizing the InfluxDB cluster, the Fields write per second typically ranges from 5k-100k. It's crucial to understand that this 100k limit is per field in InfluxDB. Our earlier estimate of 6k in the 'Resource Estimation' section was based on requests per second (rps), with each write request involving multiple fields. When calculated by fields, this number could potentially multiply, resulting in tens or even hundreds of write requests per second.

    Traffic won't always be evenly distributed; there will be periods of high and low activity. Peak loads, which could be several times the average, are likely to surpass the 100k limit of InfluxDB. Given the database's high server performance requirements, it's costly and inefficient to manage peak loads by enhancing a single database server's performance or expanding the database cluster.

    As a result, opting to write to Kafka/RabbitMQ instead of directly to the database is an effective strategy for managing peak loads and database downtime. This method allows for more efficient load management and ensures the system can handle high traffic volumes without exceeding the InfluxDB cluster's capacity.

    For information on Kafka's Writes Per Second data, refer to this article: Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines).

    To summarize the advantages of introducing a Data Ingestion System (Kafka/RabbitMQ):

    1. Fault tolerance, that is, data will not be lost when the database is down;
    2. Lower cost to handle higher peaks;
    3. Provides the possibility for the next Stream Processing System (Flink/Storm).

    There are also disadvantages:

    1. The complexity of the system is increased, and the maintenance cost is higher;
    2. The consistency of the system is reduced. (However, eventual consistency is completely satisfactory for this system)

    In conclusion, the benefits outweigh the disadvantages in this system.

  • Stream Processing System (Flink/Storm)

    Analyzing data is a time-consuming task that requires significant computational resources. In some instances, exceptions may occur during the execution process. Without a Data Ingestion System like Kafka or RabbitMQ, if these tasks are performed prior to writing into a Time Series Database (TSDB), it could be challenging to ensure the system's write requests per second (rps). This underscores the importance of integrating a Data Ingestion System such as Kafka or RabbitMQ into the system for efficient data management.

    Alt text

    Another conceivable solution is to place data analysis and processing tasks after storage in the TSDB and trigger them through a timer. This approach has two drawbacks:

    1. Loading newly written data from the database wastes system performance and increases the read pressure on the database;
    2. Poor real-time performance. Because if the timer interval is short, queries to the TSDB will be more frequent, increasing the read pressure on the database; if the time interval is long, the real-time performance will be poor.

    The use of Stream Processing does not have these two problems.

    The stream processing system takes the ingested data and processes it in real-time. It's used for real-time analytics, complex event processing, and more.

    A crucial part of its role is to analyze the incoming data based on predefined alert rules. These alert rules can be set by users and may include conditions such as CPU usage exceeding a certain percentage, memory usage reaching a critical level, or specific error messages appearing in system logs.

    When the stream processing system identifies data that matches these alert rules, it triggers an alert. This alert is then passed to the alert notification system for delivery to the appropriate users.

    Apache Flink and Apache Storm are frameworks for big data processing and analytics. They allow you to process high-speed data streams in real time and perform event-based processing. In this architecture, this system would be responsible for triggering alerts based on predefined rules.

    Refer to the stream processing section for more details.

  • Time-Series Database (InfluxDB/TimescaleDB)

    The time-series database (TSDB) is used to efficiently store and retrieve time-stamped data. TSDBs like InfluxDB or TimescaleDB are designed to handle high write and query loads, and they provide capabilities like data retention policies, continuous queries, and more.

    The processed data from the stream processing system is stored in the TSDB. This data is then available for queries and can be used to populate the dashboard.

  • Alert Notification System

    When the stream processing system detects a condition that matches an alert rule, it triggers an alert. These alerts are then distributed to users through the alert notification system.

  • Dashboard (User UI)

    The dashboard serves as the user interface for the monitoring system. It retrieves data from the time-series database to display current and historical server metrics. It should provide real-time or near real-time visibility into system metrics.

    The visual interface provided by the Dashboard allows us to define or modify alert rules, and we can also see the sending history of alert notifications.

    Existing Solutions

    There are several existing solutions that can be used for the dashboard.

    Grafana: Grafana is a multi-platform open-source analytics and interactive visualization web application. It provides charts, graphs, and alerts for the web when connected to supported data sources, including InfluxDB and TimescaleDB. It is expandable through a plug-in system and can create, explore, and share dashboards with your team.

    Kibana: Kibana is an open-source data visualization and exploration tool used for log and time-series analytics, application monitoring, and operational intelligence use cases. It offers powerful and easy-to-use features such as histograms, line graphs, pie charts, heat maps, and built-in geospatial support. Also, it provides tight integration with Elasticsearch, a popular analytics and search engine, which makes Kibana the default choice for visualizing data stored in Elasticsearch.

    Chronograf: Chronograf is InfluxData’s open-source web application. It allows you to quickly see the data that you have stored in InfluxDB so you can build robust queries and alerts. It is simple to use and includes templates and libraries to allow you to rapidly build dashboards with real-time visualizations of your data and to easily create alerting and automation rules.

  • Batch Processing System (e.g., Apache Spark)

    A major characteristic of time-series data in monitoring systems is that while the volume of data is large, access to the data is concentrated on the most recent data. Historical data, such as data from a week or a month ago, is rarely accessed. However, the vast amount of data occupies valuable database storage space. Therefore, the preservation of historical data is an issue that monitoring systems must consider. Measures that can be adopted include:

    • Aggregating historical data, transforming data with high time granularity into data with lower granularity, thereby reducing the quantity of historical data;
    • Using storage media with lower costs to store historical data.

    The batch processing system reads data from the TSDB in batches (for example, every hour). It processes and aggregates the data, and then writes the processed data to a long-term storage system. Apache Spark is a popular framework for big data processing and analytics, and it's designed to handle batch processing efficiently.

    For more information, please refer to Batch processing.

  • Long-Term Storage (e.g., Hadoop/HDFS, Amazon S3, Google Cloud Storage)

    This is where your processed, batch data is stored for long-term. It should be scalable and cost-effective, and it should support the types of queries and analyses you need to perform on your data.

    Comparing Storage Costs: InfluxDB Cloud Serverless vs. Amazon S3

    The following is an updated version of the InfluxDB Cloud Serverless Pricing Table as of August 2023:

    InfluxDB Cloud Serverless Storage pricing 2023-08

    The pricing table indicates that the cost for Storage is priced at $0.002 per GB-hour, which approximately equates to $1.44 per GB-month. In comparison, the Amazon S3 Pricing as of the same period (August 2023) stands at roughly $0.023 per GB. The storage costs between these two platforms show a significant difference, with a ratio of about 60:1. Therefore, it's highly recommended to utilize long-term storage for historical data as a cost-saving measure.

Data Store

Database Type

A time-series database (InfluxDB/TimescaleDB) is used to efficiently store and retrieve time-stamped server data.

A relational database is used to store alert rules, notifications, metadata of monitored servers, and some configuration information of the dashboard, etc.

Data Schema

Server Data (Saved to TSDB):

  • time (timestamp): The time the data point was recorded.
  • server_id (string): Identifier of the server from which the data originated.
  • cpu_usage (float): The CPU usage of the server at the recorded time.
  • memory_usage (float): The memory usage of the server at the recorded time.
  • disk_usage (float): The disk usage of the server at the recorded time.
  • system_logs (float): The network traffic on the server at the recorded time.
  • access_logs (text): Any access logs recorded at the time.
  • error_logs (text): Any error logs recorded at the time.

Alerts Table (RDBMS):

  • alert_id (string): Unique identifier for each alert.
  • server_id (string): Identifier of the server associated with the alert.
  • rule_id (string): Identifier of the rule that triggered the alert.
  • time (timestamp): The time the alert was generated.
  • status (string): Status of the alert (e.g., "open", "acknowledged", "closed").

Rules Table (RDBMS):

  • rule_id (string): Unique identifier for each rule.
  • rule_name (string): Name of the rule.
  • rule_condition (string): The condition under which the rule triggers an alert.

Servers Table (RDBMS):

  • server_id (string): Unique identifier for each server.
  • server_name (string): Name of the server.

Database Partitioning

To handle the large amount of data and high write load, we can partition the data in our time-series database. Partitioning can be done based on time or server_id. Time-based partitioning is a common strategy in time-series databases, where data is partitioned into time intervals (e.g., hourly, daily). This makes it easy to drop old data and helps with the performance of time-based queries. Server-based partitioning can help distribute the write load across multiple nodes and can improve query performance if we often query data for specific servers.

Database Replication

Replication is crucial for ensuring data availability and durability. We can use a combination of synchronous and asynchronous replication. Synchronous replication ensures strong consistency but can impact write performance. Asynchronous replication provides better write performance but can lead to data loss in case of a node failure. For our use case, we can use synchronous replication within a data center and asynchronous replication across data centers.

Data Retention and Cleanup

Given the large volume of data, we need to have a data retention policy. We can keep detailed data for a short period (e.g., one week) and then aggregate the data for long-term storage. For example, we can keep hourly averages for up to a month, daily averages for up to a year, and so on. Old data can be deleted or archived to cheaper storage. This can be done as a background job running at off-peak hours to minimize the impact on system performance.

Cache

Caching can be used to improve the performance of read-heavy operations. For example, we can cache server metadata and alert rules since these are likely to be accessed frequently. We can use an in-memory cache like Redis or Memcached for this purpose. The cache should have a suitable eviction policy (e.g., LRU) to ensure that the most frequently accessed data is kept in the cache.

We can also cache the results of frequent and expensive queries. Caching expensive queries can significantly improve the performance of the system by reducing the load on the database and decreasing the response time for these queries.

In the real-time monitoring system, once data for a specific time frame has been collected and processed, the results for that time frame will not change. This is because the metrics being monitored, such as CPU usage, memory usage, and disk usage, are time-series data that are immutable once recorded.

For example, the average CPU usage for the last hour will remain the same once that hour has passed. Therefore, if a user frequently queries for the average CPU usage in the last hour, we can cache the result of this query and serve it directly from the cache until the next hour has passed.

This makes caching particularly effective for this type of system. By storing the results of expensive queries in a cache, we can provide faster access to this data and reduce the computational load on the system.

For more detailed information about caching, you can refer to Caching.

Analytics

Analytics can provide valuable insights into the system's performance and usage patterns. We can use batch processing (e.g., Apache Spark) to analyze the data and generate reports. For example, we can identify patterns in server performance, detect anomalies, and predict future trends. We can also analyze the usage of the monitoring system itself, such as the number of active users, the most common alert rules, etc. The results of these analyses can be used to improve the system and provide better service to the users.

How to strike a balance between data granularity and aggregation?

First of all, a quick review of what data granularity and data aggregation mean.

Data Granularity refers to the level of detail or precision that data represents. It's the smallest unit of data that you're dealing with. For example, you might collect server metrics every second, every minute, or every hour. High granularity data (e.g., per second) provides more detail and allows for more precise analysis, but it also leads to larger volumes of data, which can be more challenging to store, process, and analyze.

Data Aggregation is the process of transforming high granularity data into lower granularity data by applying functions such as sum, average, max, min, etc. For example, you might take per-second metrics and aggregate them into per-minute or per-hour metrics. This reduces the volume of data and can simplify analysis and improve query performance, but it also results in a loss of detail.

There are several trade-offs to consider between data granularity and aggregation:

1. Storage and Processing Requirements: High granularity data requires more storage and processing power. Aggregating data can help to mitigate this, but it involves additional processing to perform the aggregation.

2. Detail vs. Simplicity: High granularity data provides more detail, which can be valuable for certain types of analysis. However, it can also make analysis more complex. Aggregated data is simpler and can be easier to work with, but it lacks the detail of the original data.

3. Real-Time Analysis: High granularity data can be beneficial for real-time or near-real-time analysis because it allows for more immediate insights. However, real-time processing of high granularity data can be computationally intensive.

4. Historical Trends: Aggregated data can be better for identifying long-term trends because it smooths out short-term fluctuations. For example, daily averages can give a clearer picture of overall trends than per-second data.

5. Data Retention and Granularity: Data can be processed differently based on time. For example, for the first 7 days, newly received data is kept at its original resolution to provide detailed insights. After this period, the data is aggregated to a 1-minute resolution and kept for 30 days to balance detail with storage requirements. Following the 30-day period, the data is further aggregated to a 1-hour resolution to optimize long-term storage and trend analysis.

Where do we store metric values?

Metric values, as time-series data, perform better and cost less when stored in a Time Series Database (TSDB) compared to a Relational Database Management System (RDBMS). Therefore, the system is designed to use TSDB to store Metric values.

Over time, the volume of old data (such as data from a month ago) becomes large, but the access frequency is extremely low, and it is almost never modified. Due to these characteristics, it is more cost-effective to use Long-Term Storage (e.g., Hadoop/HDFS, Amazon S3, Google Cloud Storage) with larger capacity to store old data.

After exceeding the data retention time limit, data in Long-Term Storage is deleted in bulk.

Metric values are time series data. Often with time series data, the latest values are used more frequently. Is this true for the monitoring system? How does this impact our storage solution?

Yes, in a monitoring system, the latest values are often used more frequently. This is because users typically want to see the most recent data to understand the current state of their servers. This impacts our storage solution in that we need to ensure that recent data can be retrieved quickly. This can be achieved by using a cache to store recent data for fast retrieval. And long term can use column storage for storage.

Should we ingest metric values synchronously or asynchronously?

Metric values should be ingested asynchronously. This is because the data ingestion system needs to handle a massive amount of data coming in from thousands of servers. Using an asynchronous approach allows the system to ingest data without blocking, which can improve performance and scalability.

Do we need to aggregate metric values in memory before saving them in persistent storage?

Yes, we may need to aggregate metric values in memory before saving them in persistent storage. This can help to reduce the volume of data that needs to be stored and can improve query performance. However, this involves additional processing to perform the aggregation.

How can we aggregate data in memory efficiently knowing that there may be billions of metrics? Will partitioning help?

We can aggregate data in memory efficiently by using a stream processing system (Flink/Storm). This system can process high-speed data streams in real time and perform event-based processing. Partitioning can also help by distributing the data across multiple nodes, which can improve performance and scalability.

Should we additionally aggregate data on the client side? How?

Yes, if it’s too much, we can additionally aggregate data on the client side. This can help to reduce the volume of data that needs to be transferred to the client and can improve performance.

There can be a large number of read requests, how to scale them?

To scale a large number of read requests, we can use techniques such as caching, database partitioning, and database replication. Caching can help to reduce the load on the database by storing frequently accessed data in memory for fast retrieval. Database partitioning can help to distribute the data across multiple nodes, which can improve performance and scalability. Database replication allows for data reading from replication nodes, and it is also a method to increase database throughput.