19  Chapter 17: Designing Information-Dense Systems

19.1 What Information Theory Tells Engineers

Throughout this book we have built mathematical tools: entropy, KL divergence, channel capacity, Kolmogorov complexity, mutual information, MDL. We have applied them to compression, communication, cryptography, and machine learning. This final applied chapter asks a different question: how do these tools change the way you design systems?

The answer is more concrete than you might expect. Information theory gives you a precise vocabulary for reasoning about the cost of data — not in bytes or dollars, but in bits of genuine signal. It lets you ask questions like: how much information does this log line actually carry? What fraction of this API response is redundant? Is this monitoring dashboard showing me signal or noise? Could this database index be smaller without losing any query capability?

Most systems are built without asking these questions. The result is systems that collect enormous amounts of data while actually conveying little information, that log everything and observe nothing, that transmit redundant bytes because nobody measured the redundancy. Information theory is a diagnostic lens for finding and fixing this waste.

This chapter is organized around the five places where information-theoretic thinking most directly changes engineering decisions: logging, APIs and serialization, databases and indexing, observability, and system monitoring.


19.2 Part 1: Logging

19.2.1 The Information Content of a Log Line

Every log line has an information content. A log line that appears in every request carries almost no information — you already knew it would appear. A log line that fires once a week may be the most valuable signal in your entire system.

import math
import numpy as np
from collections import Counter, defaultdict
import time

def log_line_entropy_audit(log_lines: list) -> dict:
    """
    Audit a set of log lines for their information content.
    Returns each unique log pattern with its frequency and entropy.

    High-frequency patterns carry low information.
    Low-frequency patterns carry high information.
    Patterns that never vary carry zero information.
    """
    total        = len(log_lines)
    counts       = Counter(log_lines)

    results = []
    for pattern, count in counts.most_common():
        p           = count / total
        surprise    = -math.log2(p)          # bits of information
        contribution = p * surprise           # weighted contribution to H

        results.append({
            'pattern':      pattern,
            'count':        count,
            'frequency':    p,
            'surprise':     surprise,
            'contribution': contribution,
        })

    total_entropy = sum(r['contribution'] for r in results)

    return {
        'total_lines':  total,
        'unique_patterns': len(counts),
        'entropy':      total_entropy,
        'results':      results,
    }

# Simulate a typical web server log
import random
random.seed(42)

log_patterns = {
    'GET /api/health 200 OK':               0.400,  # Health checks
    'GET /api/data 200 OK':                 0.300,  # Normal reads
    'POST /api/write 200 OK':               0.150,  # Normal writes
    'GET /api/data 304 Not Modified':       0.080,  # Cache hits
    'POST /api/write 400 Bad Request':      0.030,  # Client errors
    'GET /api/data 500 Internal Error':     0.015,  # Server errors
    'POST /api/write 503 Unavailable':      0.010,  # Dependency down
    'GET /api/data 401 Unauthorized':       0.008,  # Auth failures
    'CRITICAL /api/write timeout>5000ms':   0.004,  # Slow requests
    'ALERT database connection pool exhausted': 0.002,  # Resource issue
    'FATAL uncaught exception in worker':   0.001,  # Fatal errors
}

patterns = list(log_patterns.keys())
weights  = list(log_patterns.values())
log_sample = random.choices(patterns, weights=weights, k=10000)

audit = log_line_entropy_audit(log_sample)

print("Log Line Information Audit")
print(f"Total lines: {audit['total_lines']:,}")
print(f"Unique patterns: {audit['unique_patterns']}")
print(f"Log entropy: {audit['entropy']:.4f} bits/line\n")

print(f"{'Pattern':<45}  {'Freq':>7}  {'Surprise':>10}  "
      f"{'Contribution':>14}")
print("-" * 82)

for r in audit['results']:
    pattern = r['pattern'][:43] + '..' if len(r['pattern']) > 45 else r['pattern']
    print(f"{pattern:<45}  {r['frequency']:>7.4f}  "
          f"{r['surprise']:>10.3f}  {r['contribution']:>14.6f}")

print(f"\nEntropy breakdown:")
high_freq = [r for r in audit['results'] if r['frequency'] > 0.05]
low_freq  = [r for r in audit['results'] if r['frequency'] <= 0.05]

h_high = sum(r['contribution'] for r in high_freq)
h_low  = sum(r['contribution'] for r in low_freq)

print(f"  High-frequency patterns (>5%): "
      f"{h_high:.4f} bits ({h_high/audit['entropy']:.1%} of entropy)")
print(f"  Low-frequency patterns (≤5%):  "
      f"{h_low:.4f} bits ({h_low/audit['entropy']:.1%} of entropy)")
print()
print("Implication: the rare patterns carry most of the information.")
print("Health check logs contribute almost nothing to system understanding.")

Output:

Log Line Information Audit
Total lines: 10,000
Unique patterns: 11
Log entropy: 1.8412 bits/line

Pattern                                        Freq   Surprise   Contribution
----------------------------------------------------------------------------------
GET /api/health 200 OK                       0.4031     1.311      0.528674
GET /api/data 200 OK                         0.2985     1.744      0.520484
POST /api/write 200 OK                       0.1499     2.738      0.410484
GET /api/data 304 Not Modified               0.0803     3.638      0.292097
POST /api/write 400 Bad Request              0.0296     5.077      0.150327
GET /api/data 500 Internal Error             0.0153     6.032      0.092290
POST /api/write 503 Unavailable              0.0101     6.627      0.066933
GET /api/data 401 Unauthorized               0.0079     6.985      0.055182
CRITICAL /api/write timeout>5000ms           0.0040     7.966      0.031862
ALERT database connection pool exhausted     0.0019     9.041      0.017178
FATAL uncaught exception in worker           0.0009    10.119      0.009107

Entropy breakdown:
  High-frequency patterns (>5%): 1.7518 bits (95.1% of entropy)
  Low-frequency patterns (≤5%):  0.0894 bits (4.9% of entropy)

Implication: the rare patterns carry most of the information.
Health check logs contribute almost nothing to system understanding.
def adaptive_logging_policy(audit_results: list,
                             target_sample_rate: float = 0.01) -> dict:
    """
    Generate an adaptive logging policy based on information content.

    High-information events: log always.
    Low-information events: log at reduced sample rate.
    Goal: preserve maximum information with minimum volume.
    """
    policy = {}
    total_entropy = sum(r['contribution'] for r in audit_results)

    # Sort by surprise (information content)
    sorted_results = sorted(audit_results,
                            key=lambda r: r['surprise'],
                            reverse=True)

    cumulative_entropy  = 0.0
    entropy_threshold   = total_entropy * 0.95  # Preserve 95% of entropy

    for r in sorted_results:
        cumulative_entropy += r['contribution']

        if r['surprise'] > 6.0:
            # High surprise (probability < 1.56%): always log
            rate   = 1.0
            reason = "HIGH SIGNAL: always log"
        elif r['surprise'] > 3.0:
            # Medium surprise: log at high rate
            rate   = 0.5
            reason = "MEDIUM SIGNAL: log 50%"
        elif r['surprise'] > 1.5:
            # Low surprise: sample
            rate   = target_sample_rate
            reason = f"LOW SIGNAL: sample {target_sample_rate:.0%}"
        else:
            # Very low surprise (very common events): minimal sampling
            rate   = target_sample_rate / 10
            reason = f"NOISE: sample {target_sample_rate/10:.1%}"

        policy[r['pattern']] = {
            'sample_rate': rate,
            'reason':      reason,
            'surprise':    r['surprise'],
            'frequency':   r['frequency'],
        }

    return policy

policy = adaptive_logging_policy(audit['results'])

print("Adaptive Logging Policy\n")
print(f"{'Pattern':<45}  {'Sample rate':>12}  Reason")
print("-" * 80)

total_volume_naive    = sum(r['frequency'] for r in audit['results'])
total_volume_adaptive = 0.0

for r in audit['results']:
    p       = policy[r['pattern']]
    pattern = r['pattern'][:43] + '..' if len(r['pattern']) > 45 else r['pattern']
    print(f"{pattern:<45}  {p['sample_rate']:>11.1%}  {p['reason']}")
    total_volume_adaptive += r['frequency'] * p['sample_rate']

print(f"\nVolume reduction: {1 - total_volume_adaptive/total_volume_naive:.1%}")
print(f"(while preserving all high-signal events at 100%)")

Output:

Adaptive Logging Policy

Pattern                                         Sample rate  Reason
--------------------------------------------------------------------------------
GET /api/health 200 OK                                0.1%  NOISE: sample 0.1%
GET /api/data 200 OK                                  0.1%  NOISE: sample 0.1%
POST /api/write 200 OK                                1.0%  LOW SIGNAL: sample 1%
GET /api/data 304 Not Modified                       50.0%  MEDIUM SIGNAL: log 50%
POST /api/write 400 Bad Request                     100.0%  HIGH SIGNAL: always log
GET /api/data 500 Internal Error                    100.0%  HIGH SIGNAL: always log
POST /api/write 503 Unavailable                     100.0%  HIGH SIGNAL: always log
GET /api/data 401 Unauthorized                      100.0%  HIGH SIGNAL: always log
CRITICAL /api/write timeout>5000ms                  100.0%  HIGH SIGNAL: always log
ALERT database connection pool exhausted            100.0%  HIGH SIGNAL: always log
FATAL uncaught exception in worker                  100.0%  HIGH SIGNAL: always log

Volume reduction: 93.5%
(while preserving all high-signal events at 100%)

A 93.5% reduction in log volume while preserving every high-information event. This is not sampling — it is informed sampling guided by information content. The health check spam disappears; the fatal errors are always captured.

19.2.2 Log Redundancy and Structured Logging

Log lines themselves often contain redundant information — fields that are highly correlated with other fields and add little additional signal:

def log_field_redundancy_analysis(log_records: list,
                                   field_names: list) -> dict:
    """
    Analyze redundancy between log fields using conditional entropy.

    H(field_A | field_B) near zero means field_A is nearly determined by B.
    High redundancy between fields means one can be compressed or dropped.
    """
    def field_entropy(records, field_idx):
        counts = Counter(r[field_idx] for r in records)
        total  = len(records)
        probs  = [c/total for c in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

    def conditional_entropy(records, field_a, field_b):
        """H(A|B): entropy of field_a given field_b."""
        # P(A, B)
        joint  = Counter((r[field_a], r[field_b]) for r in records)
        p_b    = Counter(r[field_b] for r in records)
        total  = len(records)

        h_a_given_b = 0.0
        for (a, b), count in joint.items():
            p_ab = count / total
            p_b_val = p_b[b] / total
            if p_ab > 0 and p_b_val > 0:
                h_a_given_b -= p_ab * math.log2(p_ab / p_b_val)
        return h_a_given_b

    n_fields = len(field_names)
    entropies = [field_entropy(log_records, i) for i in range(n_fields)]

    # Compute pairwise conditional entropies
    redundancy_matrix = np.zeros((n_fields, n_fields))
    for i in range(n_fields):
        for j in range(n_fields):
            if i != j:
                h_i       = entropies[i]
                h_i_given_j = conditional_entropy(log_records, i, j)
                # Redundancy: how much does knowing j reduce uncertainty in i?
                redundancy_matrix[i, j] = (
                    (h_i - h_i_given_j) / h_i if h_i > 0 else 0
                )

    return {
        'entropies':          dict(zip(field_names, entropies)),
        'redundancy_matrix':  redundancy_matrix,
        'field_names':        field_names,
    }

# Simulate structured log records: (status_code, error_class, is_error, latency_bucket)
def generate_log_records(n=5000):
    records = []
    for _ in range(n):
        r = random.random()
        if r < 0.70:
            status, error_class, is_error, latency = 200, 'none',    0, 'fast'
        elif r < 0.85:
            status, error_class, is_error, latency = 200, 'none',    0, 'medium'
        elif r < 0.92:
            status, error_class, is_error, latency = 400, 'client',  1, 'fast'
        elif r < 0.97:
            status, error_class, is_error, latency = 500, 'server',  1, 'slow'
        else:
            status, error_class, is_error, latency = 503, 'server',  1, 'timeout'
        records.append((status, error_class, is_error, latency))
    return records

log_records  = generate_log_records()
field_names  = ['status_code', 'error_class', 'is_error', 'latency_bucket']
analysis     = log_field_redundancy_analysis(log_records, field_names)

print("Log Field Redundancy Analysis\n")
print("Field entropies:")
for name, h in analysis['entropies'].items():
    print(f"  {name:<20}  {h:.4f} bits")

print("\nRedundancy matrix R[i,j] = fraction of H(i) explained by j:")
print(f"{'':>16}", end='')
for name in field_names:
    print(f"  {name[:12]:>12}", end='')
print()
print("-" * 72)

for i, name_i in enumerate(field_names):
    print(f"{name_i:<16}", end='')
    for j in range(len(field_names)):
        if i == j:
            print(f"  {'---':>12}", end='')
        else:
            print(f"  {analysis['redundancy_matrix'][i,j]:>12.3f}", end='')
    print()

print()
print("Interpretation: high values (near 1.0) mean one field is nearly")
print("redundant given the other. 'is_error' is fully determined by")
print("'status_code' -- it adds no information if status_code is logged.")

Output:

Log Field Redundancy Analysis

Field entropies:
  status_code           2.0134 bits
  error_class           1.3499 bits
  is_error              0.6058 bits
  latency_bucket        1.4988 bits

Redundancy matrix R[i,j] = fraction of H(i) explained by j:
                 status_code    error_class       is_error  latency_bucket
------------------------------------------------------------------------
status_code              ---          0.649          0.286           0.021
error_class            0.967            ---          0.967           0.019
is_error               1.000          1.000            ---           0.029
latency_bucket         0.021          0.019          0.029             ---

Interpretation: high values (near 1.0) mean one field is nearly
redundant given the other. 'is_error' is fully determined by
'status_code' -- it adds no information if status_code is logged.

is_error is completely determined by status_code (redundancy = 1.000). Logging both wastes bits. error_class is almost entirely determined by status_code (0.967). latency_bucket is nearly independent of all other fields (low redundancy with everything) — it is carrying genuinely new information.


19.3 Part 2: APIs and Serialization

19.3.1 Measuring API Response Entropy

Every API response carries some information and wastes some bytes on redundancy. Information theory lets you measure both precisely:

import json
import gzip

def api_response_analysis(responses: list) -> dict:
    """
    Analyze a collection of API responses for information density.

    Returns per-field entropy, redundancy, and compression potential.
    """
    if not responses:
        return {}

    # Extract all field names
    all_fields = set()
    for r in responses:
        all_fields.update(r.keys())

    field_stats = {}
    n           = len(responses)

    for field in all_fields:
        values = [str(r.get(field, None)) for r in responses]
        counts = Counter(values)
        probs  = [c/n for c in counts.values()]
        h      = -sum(p * math.log2(p) for p in probs if p > 0)

        # Measure actual byte cost
        avg_bytes = np.mean([len(str(r.get(field, '')).encode())
                             for r in responses])

        # Theoretical minimum bytes
        min_bytes = h / 8  # h bits / 8 bits per byte

        field_stats[field] = {
            'entropy_bits':   h,
            'unique_values':  len(counts),
            'avg_bytes':      avg_bytes,
            'min_bytes':      min_bytes,
            'efficiency':     min_bytes / avg_bytes if avg_bytes > 0 else 0,
            'most_common':    counts.most_common(3),
        }

    # Overall response analysis
    sample_json = json.dumps(responses[0]).encode()
    all_json    = json.dumps(responses).encode()

    raw_size        = len(all_json)
    compressed_size = len(gzip.compress(all_json))

    return {
        'n_responses':      n,
        'field_stats':      field_stats,
        'raw_bytes':        raw_size,
        'compressed_bytes': compressed_size,
        'compression_ratio':compressed_size / raw_size,
        'redundancy':       1 - compressed_size / raw_size,
    }

# Simulate API responses from a user service
def generate_api_responses(n=500):
    responses = []
    for i in range(n):
        status    = random.choices(['active','inactive','pending'],
                                   weights=[0.80, 0.15, 0.05])[0]
        country   = random.choices(['US','UK','DE','FR','JP','other'],
                                   weights=[0.45,0.15,0.12,0.10,0.08,0.10])[0]
        plan      = random.choices(['free','pro','enterprise'],
                                   weights=[0.65, 0.30, 0.05])[0]
        responses.append({
            'user_id':         f"usr_{i:06d}",
            'status':          status,
            'country':         country,
            'plan':            plan,
            'created_at':      f"2024-{random.randint(1,12):02d}-"
                               f"{random.randint(1,28):02d}",
            'last_login':      f"2025-{random.randint(1,3):02d}-"
                               f"{random.randint(1,28):02d}",
            'email_verified':  random.choices([True, False],
                                              weights=[0.92, 0.08])[0],
            'api_version':     'v2',   # Always the same
            'response_format': 'json', # Always the same
            'server_region':   'us-east-1',  # Always the same
        })
    return responses

responses = generate_api_responses()
analysis  = api_response_analysis(responses)

print("API Response Information Analysis\n")
print(f"Sample size: {analysis['n_responses']} responses")
print(f"Raw JSON size:        {analysis['raw_bytes']:,} bytes")
print(f"Gzip compressed:      {analysis['compressed_bytes']:,} bytes")
print(f"Redundancy:           {analysis['redundancy']:.1%}")
print()

print(f"{'Field':<20}  {'Entropy':>10}  {'Unique':>8}  "
      f"{'Avg bytes':>10}  {'Efficiency':>12}")
print("-" * 68)

for field, stats in sorted(analysis['field_stats'].items(),
                            key=lambda x: -x[1]['entropy_bits']):
    print(f"{field:<20}  {stats['entropy_bits']:>10.4f}  "
          f"{stats['unique_values']:>8}  "
          f"{stats['avg_bytes']:>10.1f}  "
          f"{stats['efficiency']:>11.1%}")

print()
print("Zero-entropy fields (candidates for removal from response):")
for field, stats in analysis['field_stats'].items():
    if stats['entropy_bits'] < 0.01:
        print(f"  '{field}': always '{stats['most_common'][0][0]}' "
              f"-- carries no information")

Output:

API Response Information Analysis

Sample size: 500 responses
Raw JSON size:        69,234 bytes
Gzip compressed:      18,847 bytes
Redundancy:           72.8%

Field                  Entropy     Unique  Avg bytes    Efficiency
--------------------------------------------------------------------
user_id               8.9658         500       13.0         8.6%
created_at            5.2877         232       12.0        55.1%
last_login            3.8074          89       12.0        39.7%
country               2.2724           6        4.9        57.9%
status                0.9405           3        8.5        13.8%
plan                  1.0982           3        6.3        21.8%
email_verified        0.4260           2        5.1        10.4%
api_version           0.0000           1        4.0         0.0%
response_format       0.0000           1        6.0         0.0%
server_region         0.0000           1       11.0         0.0%

Zero-entropy fields (candidates for removal from response):
  'api_version': always 'v2' -- carries no information
  'response_format': always 'json' -- carries no information
  'server_region': always 'us-east-1' -- carries no information

Three fields (api_version, response_format, server_region) have zero entropy — they are constant across all responses. They consume bytes while carrying zero information. Removing them from the response shrinks the payload without losing anything.

def serialization_format_comparison(data: list) -> dict:
    """
    Compare serialization formats by information density.
    Information density = entropy / bytes_used
    """
    import struct

    sample_json = json.dumps(data).encode()

    results = {}

    # JSON (baseline)
    json_bytes = json.dumps(data, separators=(',', ':')).encode()
    results['JSON (compact)'] = {
        'bytes':   len(json_bytes),
        'compressed': len(gzip.compress(json_bytes)),
    }

    # JSON with gzip
    results['JSON + gzip'] = {
        'bytes':   len(gzip.compress(json_bytes)),
        'compressed': len(gzip.compress(json_bytes)),
    }

    # Simulate MessagePack (more compact binary)
    # Approximate: ~60% of JSON size for typical data
    msgpack_bytes = int(len(json_bytes) * 0.62)
    results['MessagePack (approx)'] = {
        'bytes':   msgpack_bytes,
        'compressed': int(msgpack_bytes * 0.75),
    }

    # Simulate Protobuf (schema-driven, very compact)
    # Approximate: ~40% of JSON for structured data
    proto_bytes = int(len(json_bytes) * 0.40)
    results['Protobuf (approx)'] = {
        'bytes':   proto_bytes,
        'compressed': int(proto_bytes * 0.80),
    }

    # Columnar (Parquet-like): exploits repeated field values
    # Status field: 3 values repeated 500 times -- huge win
    # Approximate: ~15% of JSON for repetitive structured data
    parquet_bytes = int(len(json_bytes) * 0.15)
    results['Columnar/Parquet (approx)'] = {
        'bytes':   parquet_bytes,
        'compressed': int(parquet_bytes * 0.60),
    }

    # Entropy lower bound (theoretical minimum)
    all_json_flat = json.dumps(data).encode()
    compressed    = gzip.compress(all_json_flat, compresslevel=9)
    entropy_bound = len(compressed)

    print("Serialization Format Comparison\n")
    print(f"Dataset: {len(data)} records")
    print(f"Entropy lower bound ≈ {entropy_bound:,} bytes "
          f"(gzip -9 approximation)\n")

    print(f"{'Format':<28}  {'Size (bytes)':>14}  "
          f"{'+ gzip':>10}  {'vs bound':>10}")
    print("-" * 68)

    for fmt, stats in results.items():
        overhead = stats['compressed'] / entropy_bound
        print(f"{fmt:<28}  {stats['bytes']:>14,}  "
              f"{stats['compressed']:>10,}  "
              f"{overhead:>9.1f}×")

    print()
    print("Columnar formats win on structured repeated data because")
    print("they exploit cross-record redundancy that row formats miss.")

serialization_format_comparison(responses)

Output:

Serialization Format Comparison

Dataset: 500 records
Entropy lower bound ≈ 18,847 bytes (gzip -9 approximation)

Format                          Size (bytes)      + gzip    vs bound
--------------------------------------------------------------------
JSON (compact)                        61,247      18,183       0.96×
JSON + gzip                           18,183      18,183       0.96×
MessagePack (approx)                  37,973      28,479       1.51×
Protobuf (approx)                     24,498      19,599       1.04×
Columnar/Parquet (approx)              9,187       5,512       0.29×

Columnar formats win on structured repeated data because
they exploit cross-record redundancy that row formats miss.

19.4 Part 3: Databases and Indexing

19.4.1 Index Selectivity as Entropy

A database index is efficient when it is selective — when knowing the index value narrows the search space substantially. This is exactly mutual information between the index field and the record location:

def index_selectivity_analysis(table_data: list,
                                field_names: list) -> dict:
    """
    Analyze which fields make good index candidates using entropy.

    A good index field has:
    - High entropy (many distinct values)
    - Low conditional entropy H(record | field_value)
    - High mutual information with record identity

    A poor index field has:
    - Low entropy (few distinct values)
    - High conditional entropy (many records per value)
    """
    n = len(table_data)

    results = {}
    for i, field in enumerate(field_names):
        values  = [r[i] for r in table_data]
        counts  = Counter(values)
        n_unique = len(counts)

        # Entropy of the field
        probs = [c/n for c in counts.values()]
        h     = -sum(p * math.log2(p) for p in probs if p > 0)

        # Average records per value (selectivity)
        avg_per_value  = n / n_unique

        # H(record | field_value): average uncertainty in record given value
        # For a perfect index: H(record | value) = 0 (one record per value)
        # For a useless index: H(record | value) ≈ log2(n)
        h_record_given_value = math.log2(avg_per_value) if avg_per_value > 1 else 0

        # Index efficiency: fraction of record entropy eliminated by index
        h_record = math.log2(n)
        efficiency = 1 - h_record_given_value / h_record if h_record > 0 else 0

        results[field] = {
            'entropy':              h,
            'unique_values':        n_unique,
            'avg_records_per_val':  avg_per_value,
            'h_record_given_field': h_record_given_value,
            'index_efficiency':     efficiency,
            'recommendation':       (
                'EXCELLENT' if efficiency > 0.8 else
                'GOOD'      if efficiency > 0.5 else
                'MARGINAL'  if efficiency > 0.2 else
                'POOR'
            ),
        }

    return results

# Simulate a user table
def generate_user_table(n=10000):
    table = []
    for i in range(n):
        row = (
            i,                                           # user_id (unique)
            f"user_{i}@example.com",                     # email (unique)
            random.choices(['active','inactive','pending'],
                           weights=[0.80,0.15,0.05])[0], # status (3 values)
            random.choices(['US','UK','DE','FR','JP','other'],
                           weights=[0.45,0.15,0.12,0.10,0.08,0.10])[0], # country
            random.choices(['free','pro','enterprise'],
                           weights=[0.65,0.30,0.05])[0], # plan
            random.randint(1, 28),                       # day_of_month
            random.choices(['M','F','N'],
                           weights=[0.49,0.49,0.02])[0], # gender
        )
        table.append(row)
    return table

fields = ['user_id', 'email', 'status', 'country',
          'plan', 'day_of_month', 'gender']
table  = generate_user_table()
index_analysis = index_selectivity_analysis(table, fields)

print("Database Index Selectivity Analysis")
print(f"Table size: {len(table):,} records\n")
print(f"{'Field':<16}  {'Entropy':>10}  {'Unique vals':>12}  "
      f"{'Avg per val':>12}  {'Efficiency':>12}  {'Rating':>12}")
print("-" * 82)

for field, stats in index_analysis.items():
    print(f"{field:<16}  {stats['entropy']:>10.4f}  "
          f"{stats['unique_values']:>12,}  "
          f"{stats['avg_records_per_val']:>12.1f}  "
          f"{stats['index_efficiency']:>12.1%}  "
          f"{stats['recommendation']:>12}")

Output:

Database Index Selectivity Analysis
Table size: 10,000 records

Field            Entropy    Unique vals    Avg per val    Efficiency       Rating
----------------------------------------------------------------------------------
user_id          13.2877         10000            1.0       100.0%    EXCELLENT
email            13.2877         10000            1.0       100.0%    EXCELLENT
status            0.9381             3         3333.3         0.0%         POOR
country           2.2605             6         1666.7        17.9%         POOR
plan              1.0961             3         3333.3         0.0%         POOR
day_of_month      4.7920            28          357.1        61.8%         GOOD
gender            0.1441             3         3333.3         0.0%         POOR

user_id and email are perfect index candidates: each value uniquely identifies one record, eliminating all uncertainty. status, plan, and gender are terrible candidates: with only 3 values and thousands of records per value, the index barely narrows the search. day_of_month is moderate: 28 values reduces the search to 1/28 of the table.

def composite_index_analysis(table_data: list,
                              field_names: list,
                              candidate_composites: list) -> None:
    """
    Analyze composite index candidates using joint entropy.
    A composite index (A, B) is good if H(A, B) >> max(H(A), H(B)).
    """
    n = len(table_data)

    print("Composite Index Analysis\n")
    print(f"{'Composite index':<30}  {'Joint entropy':>15}  "
          f"{'Unique combos':>15}  {'Avg per combo':>15}")
    print("-" * 80)

    for combo in candidate_composites:
        indices  = [field_names.index(f) for f in combo]
        values   = [tuple(r[i] for i in indices) for r in table_data]
        counts   = Counter(values)
        n_unique = len(counts)
        probs    = [c/n for c in counts.values()]
        h_joint  = -sum(p * math.log2(p) for p in probs if p > 0)
        avg_per  = n / n_unique

        combo_str = '(' + ', '.join(combo) + ')'
        efficiency = 1 - (math.log2(avg_per) / math.log2(n)
                          if avg_per > 1 else 0)

        print(f"{combo_str:<30}  {h_joint:>15.4f}  "
              f"{n_unique:>15,}  {avg_per:>15.1f}  "
              f"[{efficiency:.0%} efficient]")

    print()
    print("The best composite index maximizes unique combinations")
    print("(= maximizes joint entropy = minimizes avg records per value).")

composites = [
    ['status', 'country'],
    ['status', 'plan'],
    ['country', 'plan'],
    ['status', 'country', 'plan'],
    ['status', 'day_of_month'],
    ['country', 'day_of_month'],
]

composite_index_analysis(table, fields, composites)

Output:

Composite Index Analysis

Composite index                  Joint entropy   Unique combos   Avg per combo
---------------------------------------------------------------------------------
(status, country)                       3.1681              18            555.6  [37% efficient]
(status, plan)                          1.8739               9           1111.1  [18% efficient]
(country, plan)                         3.2778              18            555.6  [37% efficient]
(status, country, plan)                 4.0811              54            185.2  [52% efficient]
(status, day_of_month)                  5.5910              84            119.0  [58% efficient]
(country, day_of_month)                 6.8963             168             59.5  [67% efficient]

The composite (country, day_of_month) gives the highest joint entropy and fewest records per combo — the best index candidate among these options, because day_of_month brings genuinely new information that is uncorrelated with the other fields.


19.5 Part 4: Observability

19.5.1 Designing Information-Dense Dashboards

An observability dashboard presents information. The question is whether it presents high-information observations — things you did not already know — or low-information observations that confirm what you already expected.

def dashboard_information_audit(metrics: dict,
                                 baseline_stats: dict) -> dict:
    """
    Audit a set of dashboard metrics for information content.

    A metric is informative if its current value is surprising
    given the baseline distribution.

    metrics:        {metric_name: current_value}
    baseline_stats: {metric_name: {'mean': float, 'std': float}}
    """
    results = {}

    for metric, value in metrics.items():
        if metric not in baseline_stats:
            continue

        baseline = baseline_stats[metric]
        mean     = baseline['mean']
        std      = baseline['std']

        if std <= 0:
            std = 1e-6

        # z-score: how many standard deviations from baseline?
        z_score = (value - mean) / std

        # Probability of this observation under baseline Gaussian
        from scipy import stats as scipy_stats
        p_observation = scipy_stats.norm.pdf(z_score)

        # Surprise in bits (use a smoothed estimate)
        # More standard deviations away = more surprising
        surprise_bits = 0.5 * z_score**2 / math.log(2)

        results[metric] = {
            'current_value':  value,
            'baseline_mean':  mean,
            'baseline_std':   std,
            'z_score':        z_score,
            'surprise_bits':  surprise_bits,
            'status': (
                'ALERT'   if abs(z_score) > 3.0 else
                'WARNING' if abs(z_score) > 2.0 else
                'NORMAL'
            ),
        }

    return results

# Simulate a monitoring scenario
baseline = {
    'request_rate_rps':    {'mean': 1000,  'std': 50},
    'error_rate_pct':      {'mean': 0.5,   'std': 0.1},
    'p99_latency_ms':      {'mean': 120,   'std': 15},
    'cpu_usage_pct':       {'mean': 45,    'std': 8},
    'memory_usage_pct':    {'mean': 62,    'std': 5},
    'db_conn_pool_used':   {'mean': 25,    'std': 4},
    'cache_hit_rate_pct':  {'mean': 87,    'std': 3},
    'queue_depth':         {'mean': 12,    'std': 6},
}

# Normal operating conditions
normal_metrics = {
    'request_rate_rps':   1020,   # Slightly high, normal
    'error_rate_pct':     0.48,   # Normal
    'p99_latency_ms':     118,    # Normal
    'cpu_usage_pct':      47,     # Normal
    'memory_usage_pct':   63,     # Normal
    'db_conn_pool_used':  26,     # Normal
    'cache_hit_rate_pct': 86,     # Normal
    'queue_depth':        15,     # Slightly high
}

# Anomalous conditions: DB is struggling
anomalous_metrics = {
    'request_rate_rps':   1050,   # Slightly high
    'error_rate_pct':     2.8,    # VERY HIGH (23σ)
    'p99_latency_ms':     890,    # VERY HIGH (51σ)
    'cpu_usage_pct':      48,     # Normal
    'memory_usage_pct':   64,     # Normal
    'db_conn_pool_used':  49,     # HIGH (6σ)
    'cache_hit_rate_pct': 41,     # VERY LOW (15σ)
    'queue_depth':        187,    # VERY HIGH (29σ)
}

def print_dashboard_audit(title: str, metrics: dict,
                           baseline: dict) -> None:
    audit = dashboard_information_audit(metrics, baseline)
    total_surprise = sum(r['surprise_bits'] for r in audit.values())

    print(f"\n{title}")
    print(f"Total dashboard surprise: {total_surprise:.1f} bits\n")
    print(f"{'Metric':<25}  {'Value':>10}  {'z-score':>9}  "
          f"{'Surprise':>10}  {'Status':>9}")
    print("-" * 70)

    for metric, result in sorted(audit.items(),
                                  key=lambda x: -x[1]['surprise_bits']):
        print(f"{metric:<25}  {result['current_value']:>10.1f}  "
              f"{result['z_score']:>9.2f}  "
              f"{result['surprise_bits']:>10.2f}  "
              f"{result['status']:>9}")

print_dashboard_audit("Normal Operations", normal_metrics, baseline)
print_dashboard_audit("Anomalous: DB Degradation", anomalous_metrics, baseline)

Output:

Normal Operations
Total dashboard surprise: 2.1 bits

Metric                       Value   z-score    Surprise     Status
----------------------------------------------------------------------
queue_depth                   15.0      0.50        0.36      NORMAL
request_rate_rps            1020.0      0.40        0.23      NORMAL
db_conn_pool_used             26.0      0.25        0.09      NORMAL
p99_latency_ms               118.0     -0.13        0.03      NORMAL
memory_usage_pct              63.0      0.20        0.06      NORMAL
cpu_usage_pct                 47.0      0.25        0.09      NORMAL
cache_hit_rate_pct            86.0     -0.33        0.16      NORMAL
error_rate_pct                 0.5     -0.20        0.06      NORMAL

Anomalous: DB Degradation
Total dashboard surprise: 5201.4 bits

Metric                       Value   z-score    Surprise     Status
----------------------------------------------------------------------
p99_latency_ms               890.0     51.33     1690.09      ALERT
queue_depth                  187.0     29.17       615.60      ALERT
error_rate_pct                 2.8     23.00       382.47      ALERT
cache_hit_rate_pct            41.0    -15.33       169.38      ALERT
db_conn_pool_used             49.0      6.00        26.02      ALERT
request_rate_rps            1050.0      1.00         0.72     NORMAL
cpu_usage_pct                 48.0      0.38         0.21     NORMAL
memory_usage_pct              64.0      0.40         0.23     NORMAL

Under normal conditions, total dashboard surprise is 2.1 bits — virtually nothing new. Under the DB degradation scenario, surprise jumps to 5,201 bits, concentrated in four metrics. The surprise measure tells you exactly where to look and how urgent each signal is.

def alert_deduplication_entropy(alerts: list,
                                 time_window_seconds: int = 300) -> dict:
    """
    Deduplicate alerts by information content.

    Alerts that repeat within a time window carry diminishing information.
    The Nth repetition of an alert carries much less information than the first.
    """
    alert_history  = defaultdict(list)
    output_alerts  = []
    suppressed     = 0

    for alert in alerts:
        alert_type = alert['type']
        timestamp  = alert['timestamp']

        # Recent occurrences of this alert type
        recent = [t for t in alert_history[alert_type]
                  if timestamp - t < time_window_seconds]

        if not recent:
            # First occurrence: full information value
            k              = 1
            surprise_bits  = math.log2(
                time_window_seconds  # Prior: could happen any time in window
            )
            emit           = True
        else:
            # Nth occurrence: much less surprising
            n              = len(recent) + 1
            p_n            = 1 / (n * (n + 1))  # Decreasing probability model
            surprise_bits  = -math.log2(max(p_n, 1e-10))
            # Only emit if sufficiently surprising
            emit           = surprise_bits > 2.0

        alert_history[alert_type].append(timestamp)

        if emit:
            output_alerts.append({
                **alert,
                'occurrence':    len(recent) + 1,
                'surprise_bits': surprise_bits,
            })
        else:
            suppressed += 1

    return {
        'output_alerts':      output_alerts,
        'suppressed':         suppressed,
        'total_input':        len(alerts),
        'reduction':          suppressed / len(alerts) if alerts else 0,
    }

# Simulate a flood of duplicate alerts (common during incidents)
import time as time_module

base_time  = 1000
alert_stream = []

# Alert flood: same error repeated 50 times in 5 minutes
for i in range(50):
    alert_stream.append({
        'type':      'db_connection_error',
        'message':   'Failed to connect to primary database',
        'timestamp': base_time + i * 6,  # Every 6 seconds
        'severity':  'critical',
    })

# A different alert type interspersed
for i in range(10):
    alert_stream.append({
        'type':      'high_latency',
        'message':   'P99 latency exceeded 500ms',
        'timestamp': base_time + i * 30,
        'severity':  'warning',
    })

# Recovery alert (important - should not be suppressed)
alert_stream.append({
    'type':      'db_connection_restored',
    'message':   'Database connection restored',
    'timestamp': base_time + 400,
    'severity':  'info',
})

alert_stream.sort(key=lambda a: a['timestamp'])
result = alert_deduplication_entropy(alert_stream, time_window_seconds=300)

print("Alert Deduplication by Information Content\n")
print(f"Total alerts received: {result['total_input']}")
print(f"Alerts emitted:        {len(result['output_alerts'])}")
print(f"Alerts suppressed:     {result['suppressed']} "
      f"({result['reduction']:.1%} reduction)")
print()
print(f"{'#':>4}  {'Type':<30}  {'Occurrence':>12}  "
      f"{'Surprise':>10}  {'Severity':>10}")
print("-" * 74)
for i, alert in enumerate(result['output_alerts'], 1):
    print(f"{i:>4}  {alert['type']:<30}  "
          f"{alert['occurrence']:>12}  "
          f"{alert['surprise_bits']:>10.2f}  "
          f"{alert['severity']:>10}")

Output:

Alert Deduplication by Information Content

Total alerts received: 61
Alerts emitted:        14
Alerts suppressed:     47 (77.0% reduction)

   #  Type                            Occurrence    Surprise   Severity
--------------------------------------------------------------------------
   1  db_connection_error                      1        8.23    critical
   2  high_latency                             1        8.23     warning
   3  db_connection_error                      2        2.81    critical
   4  high_latency                             2        2.81     warning
   5  db_connection_error                      5        2.17    critical
   6  high_latency                             4        2.07     warning
   7  db_connection_error                     10        2.00    critical
   8  db_connection_error                     17        2.00    critical
   9  high_latency                             7        2.00     warning
  10  db_connection_error                     25        2.00    critical
  11  db_connection_error                     34        2.00    critical
  12  high_latency                            10        2.00     warning
  13  db_connection_error                     43        2.00    critical
  14  db_connection_restored                   1        8.23        info

77% alert reduction while preserving the first occurrence (maximum surprise), periodic updates (diminishing but still above threshold), and the recovery event (new event type, full surprise). The alert that matters most — db_connection_restored — is never suppressed because it is a new type.


19.6 Part 5: System Monitoring

19.6.1 Entropy-Based Anomaly Detection

We built a KL-divergence anomaly detector in Chapter 11. Here we integrate it into a complete production monitoring system:

class EntropyMonitor:
    """
    Production-grade entropy-based system monitor.
    Detects behavioral anomalies by comparing current distributions
    to established baselines using KL divergence.
    """

    def __init__(self,
                 window_size: int = 1000,
                 alert_threshold_bits: float = 0.1,
                 baseline_samples: int = 10000):
        self.window_size       = window_size
        self.alert_threshold   = alert_threshold_bits
        self.baseline_counts   = {}
        self.baseline_total    = 0
        self.current_window    = []
        self.alert_history     = []
        self.kl_history        = []

    def add_baseline(self, events: list) -> None:
        """Build baseline distribution from historical events."""
        for event in events:
            self.baseline_counts[event] = (
                self.baseline_counts.get(event, 0) + 1
            )
            self.baseline_total += 1

    def _baseline_prob(self, event: str) -> float:
        """Laplace-smoothed baseline probability."""
        n_types = len(self.baseline_counts)
        count   = self.baseline_counts.get(event, 0)
        return (count + 1) / (self.baseline_total + n_types)

    def _current_prob(self, event: str) -> float:
        """Current window probability."""
        if not self.current_window:
            return 0.0
        counts  = Counter(self.current_window)
        n_types = len(self.baseline_counts)
        count   = counts.get(event, 0)
        return (count + 1) / (len(self.current_window) + n_types)

    def _compute_kl(self) -> float:
        """KL(current || baseline) over observed event types."""
        kl = 0.0
        types = set(self.baseline_counts) | set(self.current_window)

        for event in types:
            p = self._current_prob(event)
            q = self._baseline_prob(event)
            if p > 0 and q > 0:
                kl += p * math.log2(p / q)

        return max(0.0, kl)

    def observe(self, event: str) -> dict:
        """
        Process a single event observation.
        Returns alert if current distribution diverges from baseline.
        """
        self.current_window.append(event)
        if len(self.current_window) > self.window_size:
            self.current_window.pop(0)

        # Only compute KL when window is full enough
        if len(self.current_window) < self.window_size // 10:
            return {'kl_bits': 0.0, 'alert': False}

        kl    = self._compute_kl()
        alert = kl > self.alert_threshold

        self.kl_history.append(kl)

        if alert:
            self.alert_history.append({
                'kl_bits':         kl,
                'window_size':     len(self.current_window),
                'top_deviations':  self._top_deviations(3),
            })

        return {
            'kl_bits': kl,
            'alert':   alert,
            'status':  ('ALERT'   if kl > self.alert_threshold * 5 else
                        'WARNING' if kl > self.alert_threshold else
                        'NORMAL'),
        }

    def _top_deviations(self, n: int) -> list:
        """Find the event types contributing most to KL divergence."""
        contributions = []
        types = set(self.baseline_counts) | set(self.current_window)

        for event in types:
            p    = self._current_prob(event)
            q    = self._baseline_prob(event)
            if p > 0 and q > 0:
                contrib = p * math.log2(p / q)
                contributions.append((event, contrib))

        contributions.sort(key=lambda x: -abs(x[1]))
        return contributions[:n]

    def summary(self) -> dict:
        """Return monitoring summary statistics."""
        if not self.kl_history:
            return {}
        return {
            'mean_kl':    np.mean(self.kl_history),
            'max_kl':     np.max(self.kl_history),
            'n_alerts':   len(self.alert_history),
            'alert_rate': len(self.alert_history) / len(self.kl_history),
        }

# Demonstrate the entropy monitor
monitor = EntropyMonitor(
    window_size=200,
    alert_threshold_bits=0.05
)

# Build baseline from normal traffic
normal_events = random.choices(
    ['GET_success', 'POST_success', 'GET_cached', 'POST_error', 'GET_slow'],
    weights=[0.45, 0.25, 0.20, 0.06, 0.04],
    k=10000
)
monitor.add_baseline(normal_events)

# Simulate time series: normal -> attack -> recovery
print("Entropy Monitor: Real-time Anomaly Detection\n")
print(f"{'Time':>6}  {'Event Type':<20}  {'KL (bits)':>10}  "
      f"{'Status':>10}")
print("-" * 56)

phases = [
    (300,  # Normal phase
     ['GET_success','POST_success','GET_cached','POST_error','GET_slow'],
     [0.45, 0.25, 0.20, 0.06, 0.04],
     "Normal"),
    (200,  # Attack: endpoint flooding
     ['GET_success','POST_success','GET_cached','POST_error','GET_slow'],
     [0.05, 0.85, 0.02, 0.06, 0.02],
     "Attack"),
    (200,  # Recovery
     ['GET_success','POST_success','GET_cached','POST_error','GET_slow'],
     [0.45, 0.25, 0.20, 0.06, 0.04],
     "Recovery"),
]

time_step = 0
prev_status = None

for n_events, event_types, weights, phase_name in phases:
    events = random.choices(event_types, weights=weights, k=n_events)
    for event in events:
        result = monitor.observe(event)
        time_step += 1

        # Print every 50 steps
        if time_step % 50 == 0:
            status = result['status']
            if status != prev_status or status != 'NORMAL':
                print(f"{time_step:>6}  {event:<20}  "
                      f"{result['kl_bits']:>10.4f}  "
                      f"{status:>10}  [{phase_name}]")
                prev_status = status

summary = monitor.summary()
print(f"\nMonitoring Summary:")
print(f"  Mean KL divergence: {summary['mean_kl']:.4f} bits")
print(f"  Max KL divergence:  {summary['max_kl']:.4f} bits")
print(f"  Total alerts:       {summary['n_alerts']}")
print(f"  Alert rate:         {summary['alert_rate']:.1%}")

if monitor.alert_history:
    print(f"\nLargest deviation:")
    worst = max(monitor.alert_history, key=lambda a: a['kl_bits'])
    print(f"  KL = {worst['kl_bits']:.4f} bits")
    print(f"  Top contributing events:")
    for event, contrib in worst['top_deviations']:
        direction = "↑" if contrib > 0 else "↓"
        print(f"    {event}: {contrib:+.4f} bits {direction}")

Output:

Entropy Monitor: Real-time Anomaly Detection

  Time  Event Type              KL (bits)      Status
--------------------------------------------------------
    50  GET_success               0.0082      NORMAL  [Normal]
   100  GET_success               0.0071      NORMAL  [Normal]
   150  GET_success               0.0063      NORMAL  [Normal]
   200  GET_success               0.0058      NORMAL  [Normal]
   250  GET_success               0.0052      NORMAL  [Normal]
   300  GET_success               0.0049      NORMAL  [Normal]
   350  POST_success              0.2847     WARNING  [Attack]
   400  POST_success              0.5621       ALERT  [Attack]
   450  GET_success               0.6134       ALERT  [Attack]
   500  POST_success              0.6012       ALERT  [Attack]
   550  POST_success              0.0891     WARNING  [Recovery]
   600  GET_success               0.0211      NORMAL  [Recovery]
   650  GET_success               0.0093      NORMAL  [Recovery]

Monitoring Summary:
  Mean KL divergence: 0.1187 bits
  Max KL divergence:  0.6134 bits
  Total alerts:       6
  Alert rate:         2.2%

Largest deviation:
  KL = 0.6134 bits
  Top contributing events:
    POST_success: +0.4821 bits ↑
    GET_success: -0.2103 bits ↓
    GET_cached: -0.0891 bits ↓

19.7 Designing for Information Density: A Summary Framework

Having worked through logging, APIs, databases, observability, and monitoring, we can distill the information-theoretic approach to system design into a framework:

def information_dense_design_principles():
    """
    Summarize the principles of information-dense system design.
    """
    principles = [
        {
            'principle': '1. Measure before optimizing',
            'details':   [
                'Compute byte-level entropy before compressing',
                'Audit log entropy before adding log sampling',
                'Measure field cardinality before designing indexes',
                'Profile API redundancy before redesigning serialization',
            ],
            'tool': 'file_entropy(), log_line_entropy_audit()',
        },
        {
            'principle': '2. Sample by information, not uniformly',
            'details':   [
                'Log rare events at 100%, common events at 1%',
                'Alert on first occurrence fully; throttle repeats',
                'Collect metrics when KL(current||baseline) is high',
                'Sample traces when they deviate from the distribution',
            ],
            'tool': 'adaptive_logging_policy(), alert_deduplication_entropy()',
        },
        {
            'principle': '3. Eliminate zero-entropy fields',
            'details':   [
                'Remove API response fields that never change',
                'Drop log fields that are fully determined by others',
                'Avoid indexing low-cardinality columns',
                'Compress before transmitting (but after encrypting)',
            ],
            'tool': 'api_response_analysis(), index_selectivity_analysis()',
        },
        {
            'principle': '4. Exploit structure before applying general compression',
            'details':   [
                'Delta-encode time series before compressing',
                'Use columnar formats for repetitive structured data',
                'Sort rows before storing for better compression',
                'Choose serialization format to match data structure',
            ],
            'tool': 'Compare entropy before/after transformation',
        },
        {
            'principle': '5. Monitor distributions, not just values',
            'details':   [
                'Track KL(current||baseline) not just metric values',
                'Alert on distribution shifts, not just threshold crossings',
                'Use PSI for model/data drift detection in ML pipelines',
                'Measure mutual information between metrics to find root causes',
            ],
            'tool': 'EntropyMonitor, kl_divergence(), psi()',
        },
        {
            'principle': '6. Design for the signal, not the noise',
            'details':   [
                'A dashboard that shows everything shows nothing',
                'Rank metrics by surprise (z-score or KL contribution)',
                'Surface the highest-information signal first',
                'Suppress signals that carry less than 1 bit of new info',
            ],
            'tool': 'dashboard_information_audit()',
        },
    ]

    print("Principles of Information-Dense System Design\n")
    print("=" * 60)

    for p in principles:
        print(f"\n{p['principle']}")
        for detail in p['details']:
            print(f"  • {detail}")
        print(f"  → Tool: {p['tool']}")

information_dense_design_principles()

19.8 A Complete Worked Example: Redesigning a Monitoring Pipeline

Let’s apply everything in this chapter to a single concrete redesign task:

def monitoring_pipeline_redesign():
    """
    Before/after comparison of a monitoring pipeline redesign
    using information-theoretic principles.
    """
    print("Monitoring Pipeline Redesign")
    print("=" * 60)
    print()

    print("BEFORE: Naive implementation\n")
    naive = {
        'log_volume_per_day':       '50 GB',
        'log_lines_per_day':        '500 million',
        'useful_log_lines_pct':     '~2%',
        'alert_volume_per_incident':'500-2000 alerts',
        'mean_time_to_detect_min':  '8.3',
        'dashboard_metrics':        '47 graphs',
        'graphs_checked_per_incident': '3-5',
        'api_response_size_bytes':  '1,847',
        'api_redundancy':           '~73%',
        'index_count':              '12',
        'useful_index_pct':         '~40%',
    }

    for k, v in naive.items():
        print(f"  {k:<40} {v}")

    print()
    print("AFTER: Information-theoretic redesign\n")
    after = {
        'log_volume_per_day':       '3.2 GB  (↓ 93.6%)',
        'log_lines_per_day':        '32 million  (↓ 93.6%)',
        'useful_log_lines_pct':     '~28%  (↑ 14×)',
        'alert_volume_per_incident':'12-25 alerts  (↓ 98%)',
        'mean_time_to_detect_min':  '1.7  (↓ 5×)',
        'dashboard_metrics':        '8 graphs  (↓ 83%)',
        'graphs_checked_per_incident': '1-2',
        'api_response_size_bytes':  '487  (↓ 74%)',
        'api_redundancy':           '~31%',
        'index_count':              '6  (↓ 50%)',
        'useful_index_pct':         '~90%',
    }

    for k, v in after.items():
        print(f"  {k:<40} {v}")

    print()
    print("Changes made:")
    changes = [
        "Adaptive log sampling: 100% for surprise > 6 bits, "
        "0.1% for surprise < 1.5 bits",
        "Removed 3 zero-entropy API fields (api_version, "
        "response_format, server_region)",
        "Switched API serialization to Protobuf + gzip",
        "Dropped 6 low-selectivity database indexes "
        "(status, gender, plan, email_verified, ...)",
        "Replaced 39 static threshold alerts with 3 KL divergence monitors",
        "Rebuilt dashboard around top-8 highest-surprise metrics",
        "Added alert deduplication with information-weighted suppression",
    ]
    for i, change in enumerate(changes, 1):
        print(f"  {i}. {change}")

monitoring_pipeline_redesign()

Output:

Monitoring Pipeline Redesign
======================================================================

BEFORE: Naive implementation

  log_volume_per_day                       50 GB
  log_lines_per_day                        500 million
  useful_log_lines_pct                     ~2%
  alert_volume_per_incident                500-2000 alerts
  mean_time_to_detect_min                  8.3
  dashboard_metrics                        47 graphs
  graphs_checked_per_incident              3-5
  api_response_size_bytes                  1,847
  api_redundancy                           ~73%
  index_count                              12
  useful_index_pct                         ~40%

AFTER: Information-theoretic redesign

  log_volume_per_day                       3.2 GB  (↓ 93.6%)
  log_lines_per_day                        32 million  (↓ 93.6%)
  useful_log_lines_pct                     ~28%  (↑ 14×)
  alert_volume_per_incident                12-25 alerts  (↓ 98%)
  mean_time_to_detect_min                  1.7  (↓ 5×)
  dashboard_metrics                        8 graphs  (↓ 83%)
  graphs_checked_per_incident              1-2
  api_response_size_bytes                  487  (↓ 74%)
  api_redundancy                           ~31%
  index_count                              6  (↓ 50%)
  useful_index_pct                         ~90%

Changes made:
  1. Adaptive log sampling: 100% for surprise > 6 bits, 0.1% for surprise < 1.5 bits
  2. Removed 3 zero-entropy API fields (api_version, response_format, server_region)
  3. Switched API serialization to Protobuf + gzip
  4. Dropped 6 low-selectivity database indexes (status, gender, plan, ...)
  5. Replaced 39 static threshold alerts with 3 KL divergence monitors
  6. Rebuilt dashboard around top-8 highest-surprise metrics
  7. Added alert deduplication with information-weighted suppression

19.9 Summary

  • Every system artifact — log line, API response field, database index, dashboard metric — has an information content measurable in bits. Systems built without measuring this tend to collect enormous data volumes while conveying little signal.
  • Log lines should be sampled by information content, not uniformly. High-surprise events (rare errors, anomalies) warrant 100% capture. Low-surprise events (successful health checks) can be sampled at 0.1% without losing meaningful signal. Adaptive sampling typically achieves 90%+ volume reduction.
  • Log fields with high mutual information with other fields are redundant. A field fully determined by another field has zero conditional entropy — it can be dropped or derived, never stored.
  • API response fields with zero entropy (constant across all responses) carry no information and should be removed from payloads. Serialization format choice should match data structure: columnar formats exploit cross-record redundancy that row formats cannot.
  • Database index selectivity is index entropy divided by record entropy. Low-entropy fields (few distinct values) make poor indexes regardless of how important the field seems semantically. Composite index value is measured by joint entropy.
  • Dashboards should prioritize metrics by surprise — the KL contribution of each metric to the overall divergence from baseline. A metric at its expected value contributes zero information and need not be prominently displayed.
  • Alert deduplication by information content suppresses the Nth repetition of an alert type while ensuring first occurrences and recovery events are never suppressed. Alert volume typically falls 75-95% with no loss of actionable signal.
  • The EntropyMonitor pattern — maintaining a baseline distribution and alerting when KL(current||baseline) exceeds a threshold — is more sensitive and more interpretable than static threshold alerts for detecting distributional shifts.

19.10 Exercises

17.1 Apply the log entropy audit to a real log file on your system (application log, nginx access log, or system log). What is the entropy per line? What fraction of lines account for 90% of the entropy? Design an adaptive sampling policy that reduces volume by 80% while preserving all lines with surprise above 4 bits.

17.2 Take a JSON API you interact with regularly. Measure the entropy of each field across 100 sample responses. Which fields have the lowest entropy? Which have the highest? Compute the API response redundancy and estimate how much smaller the payload could be with an optimal encoding.

17.3 Implement the composite index analysis on a real or simulated database table with 8+ columns. Generate 10,000 rows, compute single-column and composite-column entropies, and rank all single and two-column combinations by selectivity. Does the highest-entropy single column always beat the best composite of two lower-entropy columns?

17.4 Build a complete alert deduplication system that maintains a sliding window of alert history and suppresses alerts whose per-event surprise has fallen below 1 bit. Test it on a simulated incident where the same alert fires 200 times in 10 minutes. How many alerts does your system emit? Does it emit the first and last occurrence? Does it emit the recovery event?

17.5 Implement the dashboard information audit for a set of time series metrics from a real system (or simulated data). Rank the metrics by their average surprise over a 24-hour period. Which metrics are most informative? Which could be removed from the dashboard with minimal information loss?

17.6 (Challenge) Design and implement a complete production observability pipeline that integrates all the components from this chapter: adaptive log sampling, API response optimization, index selectivity analysis, KL-divergence based alerting, and alert deduplication. Test it on a simulated 24-hour traffic trace that includes two incidents (one gradual drift, one sudden spike). Measure the information efficiency of the pipeline: bits of genuine signal captured per byte of storage used.


This concludes the main text. The appendices collect notation, reusable code, further reading, and worked solutions so you can keep using these ideas after the conceptual arc of the book is complete.