Skip to content
5 min read

PostgreSQL and Redis: Practical Caching Patterns for Backend Engineers

When to reach for Redis, when PostgreSQL is enough, and the caching patterns that actually hold up under production load — with Python examples throughout.

#postgresql #redis #backend #python #databases

Introduction

One of the most common questions I see from engineers moving into backend development is: “when should I add Redis?” The honest answer is that Redis is often added too early, based on a vague sense that caching is important — and sometimes too late, after a PostgreSQL query has already become a user-facing problem.

In this post I’ll walk through the caching patterns I actually use, with concrete Python examples, and the decision criteria I apply before reaching for Redis.

When PostgreSQL Is Enough

Before talking about Redis, let’s be honest about what PostgreSQL can handle.

A well-indexed PostgreSQL query on a modern server can return results in 1–5ms. For most applications, that’s fast enough. Adding Redis introduces cache invalidation complexity — which is a real cost.

PostgreSQL features that reduce the need for external caching:

Partial indexes for filtering on common conditions:

-- Only indexes active users — smaller index, faster queries
CREATE INDEX idx_users_active_email 
ON users (email) 
WHERE status = 'active';

Materialised views for expensive aggregations:

CREATE MATERIALIZED VIEW monthly_revenue AS
SELECT 
  date_trunc('month', created_at) AS month,
  SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY 1;

-- Refresh on a schedule or after order inserts
REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_revenue;

Connection pooling with PgBouncer reduces connection overhead and lets you handle more concurrent queries without the latency penalty of establishing new TCP connections.

Don’t add Redis because you assume you’ll need it. Add it when you have a measured problem.

Cache-Aside Pattern (the most common one)

Cache-aside (also called lazy loading) is the pattern where your application checks the cache first, reads from the database on a miss, and populates the cache for next time.

import redis
import json
import psycopg2
from typing import Optional

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_user(user_id: int) -> Optional[dict]:
    cache_key = f"user:{user_id}"

    # 1. Check cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. Cache miss — read from PostgreSQL
    with psycopg2.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT id, email, name, status FROM users WHERE id = %s",
                (user_id,)
            )
            row = cur.fetchone()

    if not row:
        return None

    user = {"id": row[0], "email": row[1], "name": row[2], "status": row[3]}

    # 3. Populate cache with TTL
    r.setex(cache_key, 300, json.dumps(user))  # 5-minute TTL

    return user

The TTL is critical. Without it, stale data lives in the cache forever. Choose TTL based on how often the underlying data changes and how much staleness your application can tolerate.

Invalidate on write:

def update_user(user_id: int, name: str) -> None:
    with psycopg2.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE users SET name = %s WHERE id = %s",
                (name, user_id)
            )
        conn.commit()

    # Invalidate — next read will go to the database
    r.delete(f"user:{user_id}")

Write-Through Pattern

Write-through updates the cache synchronously on every write. The cache always reflects the current state of the database — no staleness window.

def update_product_price(product_id: int, new_price: float) -> None:
    cache_key = f"product:{product_id}"

    with psycopg2.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE products SET price = %s WHERE id = %s RETURNING id, name, price",
                (new_price, product_id)
            )
            row = cur.fetchone()
        conn.commit()

    if row:
        product = {"id": row[0], "name": row[1], "price": float(row[2])}
        r.setex(cache_key, 3600, json.dumps(product))

Write-through is a good fit for read-heavy data that changes predictably — product catalogues, configuration, pricing. It’s less suitable for write-heavy data where you’d be updating the cache constantly.

Rate Limiting with Redis

Redis is excellent for rate limiting because it supports atomic increment operations. Here’s a sliding window rate limiter using a sorted set:

import time

def is_rate_limited(user_id: int, limit: int = 100, window_seconds: int = 60) -> bool:
    key = f"rate_limit:{user_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = r.pipeline()
    # Remove requests outside the window
    pipe.zremrangebyscore(key, 0, window_start)
    # Count requests in the window
    pipe.zcard(key)
    # Add this request
    pipe.zadd(key, {str(now): now})
    # Set expiry
    pipe.expire(key, window_seconds)
    results = pipe.execute()

    request_count = results[1]
    return request_count >= limit

Using a pipeline here is important — it sends all commands to Redis in one round trip and executes them atomically, avoiding race conditions.

Session Storage

Sessions are a natural fit for Redis: they’re short-lived, access patterns are pure key lookups, and you want TTL-based expiry.

import secrets
from datetime import timedelta

SESSION_TTL = int(timedelta(hours=24).total_seconds())

def create_session(user_id: int) -> str:
    session_id = secrets.token_urlsafe(32)
    session_data = {"user_id": user_id, "created_at": time.time()}
    r.setex(f"session:{session_id}", SESSION_TTL, json.dumps(session_data))
    return session_id

def get_session(session_id: str) -> Optional[dict]:
    data = r.get(f"session:{session_id}")
    if data:
        # Refresh TTL on activity
        r.expire(f"session:{session_id}", SESSION_TTL)
        return json.loads(data)
    return None

def delete_session(session_id: str) -> None:
    r.delete(f"session:{session_id}")

The EXPIRE refresh on access implements a sliding expiry — the session stays alive as long as the user is active.

Pub/Sub for Real-Time Events

Redis pub/sub is useful for broadcasting events to multiple consumers without persistent storage. I use it for real-time notifications:

# Publisher — in your API handler
def notify_order_update(order_id: int, status: str) -> None:
    message = json.dumps({"order_id": order_id, "status": status})
    r.publish(f"order:{order_id}", message)

# Subscriber — in a background worker
def listen_for_updates(order_id: int):
    pubsub = r.pubsub()
    pubsub.subscribe(f"order:{order_id}")

    for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            push_notification_to_client(data)

Note: Redis pub/sub is fire-and-forget — if a subscriber is offline when a message is published, the message is lost. For guaranteed delivery, use SQS or Kafka instead.

Cache Stampede Prevention

A cache stampede happens when many requests hit an expired key simultaneously, all query the database, and all try to repopulate the cache at once. For expensive queries, this can bring down your database.

The simplest mitigation is probabilistic early expiration — start refreshing the cache slightly before it expires:

import random

def get_expensive_data(key: str) -> dict:
    data = r.get(key)
    if data:
        ttl = r.ttl(key)
        # Refresh early with increasing probability as TTL decreases
        if ttl < 60 and random.random() < (1 - ttl / 60):
            data = None  # Treat as expired, refresh now

    if not data:
        result = run_expensive_query()
        r.setex(key, 300, json.dumps(result))
        return result

    return json.loads(data)

For truly high-traffic keys, use a distributed lock (Redis SET NX) to ensure only one process refreshes the cache at a time, while others serve slightly stale data.

Key Naming Conventions

Consistent key naming makes debugging much easier. I use a namespace:resource:id pattern:

user:42                     → User record
user:42:permissions         → User's permission set
session:abc123def456        → Session data
rate_limit:user:42          → Rate limit counter
product:999:price_history   → Related data

Avoid generic keys like data or cache. When you’re in redis-cli at 2am debugging a production issue, clear names matter.

When to Choose Redis Over PostgreSQL

Use casePostgreSQLRedis
Complex queries, joinsYesNo
ACID transactionsYesLimited
Full-text searchYes (with pg_trgm)No
Simple key lookupsYes (with indexes)Faster
Rate limitingPossibleYes
SessionsPossibleYes
Pub/subWith LISTEN/NOTIFYYes
Queue/worker patternPossibleWith Streams
Very high throughput (>10k RPS per key)Probably notYes

Conclusion

Redis and PostgreSQL are complementary, not competitive. PostgreSQL is your source of truth — durable, consistent, queryable. Redis is your performance layer for access patterns that benefit from in-memory speed.

Before reaching for Redis:

  1. Measure the actual latency — is PostgreSQL actually slow?
  2. Add appropriate indexes and see if that solves it
  3. If you do add Redis, be explicit about TTLs and invalidation strategy

The discipline of “add complexity only when you have evidence it’s needed” serves well here. A well-tuned PostgreSQL instance can handle far more than most people expect — but when you do need Redis, these patterns will get you far.

Kaikobud Sarkar

Kaikobud Sarkar

Software engineer passionate about backend technologies and continuous learning. I write about Python frameworks, cloud architecture, engineering growth, and staying current in tech.

Related Articles