Security Logging in Django with a Custom PostgreSQL Logging Handler

A practical guide to building a dedicated security logging system in Django using Python logging, custom handlers, and a separate PostgreSQL database. Covers logger configuration, handler internals, connection pooling, threading.Lock(), middleware integration, and when async or queued logging becomes worthwhile.

Security Logging in Django with a Custom PostgreSQL Logging Handler

Django already has a mature logging system built on top of Python’s standard logging module. For most application logs, writing to stdout, files, or a centralized logging provider is enough. But sometimes you want a separate audit/security log that is stored outside the main application database. For example, you may want to record suspicious request methods, failed authorization checks, CSRF failures, webhook signature failures, rate-limit triggers, or other security-relevant events.

A common first instinct is to put logging logic directly inside middleware or views. That works for a small prototype, but it does not scale well across a project. A cleaner pattern is to expose a small project-wide security logging API, configure a named logger in Django’s LOGGING setting, and attach a custom logging handler that writes to a separate PostgreSQL database.

This keeps the call sites simple:

python

1
2
3
4
5
6
record_security_event(
    "illegal_request_method",
    request,
    status_code=response.status_code,
    allowed_methods=response.get("Allow", ""),
)

while keeping the actual persistence backend — PostgreSQL, Redis, Celery, a file, or something else — behind a handler implementation.

Why use a logging handler?

Python logging is designed around a simple flow:

logger.warning(...)

  • creates a LogRecord
  • passes it to matching handlers
  • each handler emits the record

So when you call:

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import logging

logger = logging.getLogger("security")

logger.warning(
    "illegal_request_method",
    extra={
        "event_type": "illegal_request_method",
        "path": request.path,
        "method": request.method,
        "status_code": 405,
    },
)

Python creates a LogRecord. The values passed through extra become attributes on that record. A custom handler can then access them with:

python

1
2
3
getattr(record, "path", None)
getattr(record, "method", None)
getattr(record, "status_code", None)

The handler’s emit() method is called automatically when the log event passes the logger’s level and filters. You do not call emit() yourself.

This is the central point: the project only emits semantic events, while the handler decides where those events go.

Django logging configuration

In Django, the correct place to configure loggers and handlers is settings.py.

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,

    "handlers": {
        "security_bytestaq": {
            "class": "common.sec.app_logging.handlers.SecurityLoggingHandler",
        },
    },

    "loggers": {
        "security": {
            "handlers": ["security_bytestaq"],
            "level": "WARNING",
            "propagate": False,
        },
    },
}

This tells Django that the named logger "security" should send warning-or-higher records to SecurityLoggingHandler.

Then, anywhere in the project:

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import logging

security_logger = logging.getLogger("security")

security_logger.warning(
    "illegal_request_method",
    extra={
        "event_type": "illegal_request_method",
        "path": request.path,
        "method": request.method,
    },
)

When that line runs, Django/Python logging finds the configured handler and calls its emit() method.

One important warning: settings.py is for configuration, not runtime setup. Do not open PostgreSQL connections, create databases, run migrations, or call setup functions from settings. Django imports settings for many commands: runserver, migrate, makemigrations, shell, collectstatic, tests, and more. If settings opens external connections, those commands can fail before Django even starts.

A project-wide security logging API

Instead of scattering raw logger.warning(...) calls everywhere, create a small wrapper function.

common/sec/app_logging/logger.py

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import logging

security_logger = logging.getLogger("security")


def record_security_event(event_type, request=None, **data):
    payload = {
        "event_type": event_type,
        **data,
    }

    if request is not None:
        payload.update(
            {
                "path": request.path,
                "method": request.method,
                "ip_address": get_client_ip(request),
                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
            }
        )

    security_logger.warning(event_type, extra=payload)


def get_client_ip(request):
    forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")

    if forwarded_for:
        return forwarded_for.split(",")[0].strip()

    return request.META.get("REMOTE_ADDR")

Now call sites remain clean:

record_security_event(
    "illegal_request_method",
    request,
    status_code=response.status_code,
    allowed_methods=response.get("Allow", ""),
)

This gives you a stable internal API. Today it may write synchronously to PostgreSQL. Later it could push to Redis, Celery, Kafka, or a third-party telemetry system without rewriting every middleware and view.

Example middleware

A simple middleware for HTTP 405 responses could look like this:

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from common.sec.app_logging.logger import record_security_event


class RequestMethodMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)

        if response.status_code == 405:
            record_security_event(
                "illegal_request_method",
                request,
                status_code=response.status_code,
                allowed_methods=response.get("Allow", ""),
            )

        return response

This middleware does not know about PostgreSQL. It only records a security event.

That separation is valuable. Middleware should detect and describe the event. The logging system should decide how to store it.

Writing the custom handler

If you want to write security events to a separate PostgreSQL database, you can implement a custom handler.

For a normal synchronous Django stack, psycopg2.pool.ThreadedConnectionPool is more appropriate than SimpleConnectionPool. SimpleConnectionPool is intended for single-threaded use. A Django application can serve requests concurrently across threads, so the threaded pool is the safer choice.

common/sec/app_logging/handlers.py

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import threading

from django.conf import settings
from psycopg2.pool import ThreadedConnectionPool


class SecurityLoggingHandler(logging.Handler):
    _pool = None
    _pool_lock = threading.Lock()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get_pool(self):
        if self._pool is None:
            with self._pool_lock:
                self._pool = ThreadedConnectionPool(
                    minconn=1,
                    maxconn=2,
                    dbname=settings.SECURITY_DB["dbname"],
                    user=settings.SECURITY_DB["user"],
                    password=settings.SECURITY_DB["password"],
                    host=settings.SECURITY_DB["host"],
                    port=settings.SECURITY_DB["port"],
                )

        return self._pool

    def emit(self, record):
        conn = None

        try:
            pool = self.get_pool()
            conn = pool.getconn()

            with conn.cursor() as cursor:
                cursor.execute(
                    """
                    INSERT INTO security_event_log (
                        event_type,
                        path,
                        method,
                        ip_address,
                        user_agent,
                        status_code,
                        allowed_methods,
                        created_at
                    )
                    VALUES (%s, %s, %s, %s, %s, %s, %s, now())
                    """,
                    (
                        getattr(record, "event_type", record.getMessage()),
                        getattr(record, "path", None),
                        getattr(record, "method", None),
                        getattr(record, "ip_address", None),
                        getattr(record, "user_agent", ""),
                        getattr(record, "status_code", None),
                        getattr(record, "allowed_methods", ""),
                    ),
                )

            conn.commit()

        except Exception:
            if conn is not None:
                conn.rollback()

            self.handleError(record)

        finally:
            if conn is not None:
                pool.putconn(conn)

This handler lazily creates a connection pool the first time a security event is emitted. After that, connections are borrowed from the pool and returned after each write.

Why use threading.Lock()?

The threading.Lock() is not there to prevent overlapping inserts. PostgreSQL can handle concurrent inserts, and the connection pool is designed to handle concurrent connection checkouts.

The lock only protects pool initialization:

if self._pool is None: with self._pool_lock: if self._pool is None: self._pool = ThreadedConnectionPool(...)

Without this lock, two requests could hit the logger at the same time when _pool is still None. Both threads could observe _pool is None, and both could create a connection pool. That is a race condition.

The double-check pattern prevents that:

  1. First check: avoid locking after the pool already exists.
  2. Lock: allow only one thread to initialize the pool.
  3. Second check: confirm another thread did not already create the pool while this thread waited.

After the pool exists, the lock is not involved in normal writes.

You usually do not want this:

python

1
2
with self._pool_lock:
    cursor.execute(...)

That would serialize every log write and defeat the purpose of having a pool. The lock is for safe one-time initialization, not for write correctness.

One pool per process

A connection pool created this way is process-local. If you run four worker processes, each process has its own pool.

For example:

4 workers × maxconn=2 = up to 8 security DB connections

That matters when sizing PostgreSQL connection limits. For low-volume security logging, a small pool is usually enough:

minconn=1 maxconn=2

or even maxconn=1 if you are comfortable serializing security writes per process.

Table design

A simple PostgreSQL table could look like this:

sql

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE security_event_log (
    id bigserial PRIMARY KEY,
    event_type text NOT NULL,
    path text,
    method varchar(16),
    ip_address inet,
    user_agent text,
    status_code smallint,
    allowed_methods varchar(255),
    created_at timestamptz NOT NULL DEFAULT now()
);

Use timestamptz, not plain timestamp, unless you have a specific reason not to. The default:

created_at timestamptz NOT NULL DEFAULT now()

lets PostgreSQL assign the timestamp automatically, so your insert does not need to provide it.

Avoid setup during settings import or requests

Database setup should not run inside settings.py, and it should not run inside a view.

Avoid this:

python

1
2
3
4
5
6
7
8
9
# settings.py
first_time_db_setup()

Also avoid this:

views.py
def index(request):
    first_time_db_setup()
    ...

Setup code should be a manually executed SQL script.

For example:

common/sec/management/commands/setup_security_db.py

python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def first_time_db_setup() -> None:
    with psycopg2.connect(
        dbname=SECURITY_DB['dbname'],
        user=SECURITY_DB['user'],
        password=SECURITY_DB['password'],
        host=SECURITY_DB['host'],
        port=SECURITY_DB['port']
    ) as con:
        cur = con.cursor()


        queries = [
            '''
            CREATE TABLE IF NOT EXISTS app_logs(
            id BIGSERIAL PRIMARY KEY,
            event_type TEXT NOT NULL,
            addt_message TEXT NULL,
            path TEXT,
            method VARCHAR(16),
            ip_address INET,
            user_agent TEXT,
            status_code SMALLINT,
            created_at TIMESTAMPTZ NOT NULL DEFAULT now()
            )
            '''
        ]

        for q in queries:
            cur.execute(q)

        con.commit()

This keeps startup, request handling, and schema setup separate.

Async logging and when it matters

If your middleware stack is mostly synchronous — including Django’s default middleware — one async database write at the end of the request does not buy much. Django will already be adapting the request through sync code. An async PostgreSQL insert would add complexity without meaningfully improving throughput.

Async logging starts making sense when the whole request path is async, or when logging is decoupled from the request entirely. For a larger system, the architecture may become:

request path
    -> emit structured security event
    -> Redis/Celery/RQ/Dramatiq queue
    -> worker writes to PostgreSQL

That design is useful when security logging happens across many points in the project or becomes frequent enough that request latency matters. It also enables retries, buffering, batch inserts, backpressure, and isolation from security database outages.

For a low-volume 405 logger, a synchronous insert through a small PostgreSQL connection pool is usually simpler and good enough.

Practical recommendation

Use this layered design:

settings.py
    -> configures the "security" logger

record_security_event(...)
    -> project-wide API for emitting security events

SecurityLoggingHandler.emit(...)
    -> writes records to the separate PostgreSQL database

ThreadedConnectionPool
    -> reuses connections safely across threads

threading.Lock()
    -> protects lazy one-time pool initialization only

This gives you clean call sites, a centralized event schema, separation between application data and security data, and an easy path to a queue-based architecture later if the logging volume grows.

Join the Newsletter

Practical insights on Django, backend systems, deployment, architecture, and real-world development — delivered without noise.

Get updates when new guides, learning paths, cheat sheets, and field notes are published.

No spam. Unsubscribe anytime.



There is no third-party involved so don't worry - we won't share your details with anyone.