DEV Community

Cover image for Rebuilding TLS, Part 2 — Adding Integrity to the Channel
Dmytro Huz for AWS Community Builders

Posted on • Originally published at open.substack.com

Rebuilding TLS, Part 2 — Adding Integrity to the Channel

In the first part of this series, we built our first fake secure channel.

We took a simple socket-based client and server, wrapped their communication in AES-CTR with a shared secret key, and got something that already looked much more serious than plain TCP. The traffic stopped being transparent. A passive observer could no longer read the request and response directly.

That was real progress.

But it still had a fatal flaw.

The receiver had no way to know whether the encrypted message had been changed on the way.

Encryption hid the bytes.

It did not protect their meaning.

So in this part, we will fix that.

We will first add a MAC so the receiver can detect tampering. Then we will make the record layer a little less naive by adding a sequence number. And after that, we will take one more step toward the real world and move to AEAD, because that is how modern secure protocols usually protect records.

We still will not have real TLS when we are done.

But we will have a much more serious record layer than the one from Part 1.


What we will build in this part

The plan for this article is simple:

  • briefly introduce MACs
  • add HMAC to our encrypted record format
  • make tampering detectable
  • add a sequence number to each record
  • explain why sequence numbers matter
  • then move from our hand-built “encrypt + MAC” construction to AEAD, because that is the approach real systems usually use

Just like in Part 1, I want to keep the pattern simple:

  • explain the idea
  • show the code
  • explain what changed
  • explain what is still broken

Why encryption still was not enough

At the end of Part 1, our protocol already had one real property:

  • confidentiality against passive observers

That mattered.

But it still failed against active attackers.

Because AES-CTR by itself does not provide integrity, an attacker could modify ciphertext and the receiver would still decrypt it and trust the result. That was the main lesson of the first article:

confidentiality is not integrity

So the next missing property is obvious.

The receiver needs a way to verify that the message arrived unchanged.

That is what a MAC gives us.


A very short note on MACs

MAC stands for Message Authentication Code.

Very roughly, it is a cryptographic tag computed over a message using a secret key.

The sender computes the tag and sends it together with the message.

The receiver recomputes the tag and compares it with the one that was received.

If the tags match, the receiver can trust that:

  • the message was not modified
  • and it was created by someone who knows the MAC key

If the tags do not match, the message must be rejected.

In this article, we will use HMAC-SHA256.

I do not want to go too deep into HMAC itself here, because the goal of this series is to understand TLS as a protocol. But if you want a deeper explanation of MACs and HMAC, I already wrote about them in my cryptography series, and I’ll link that here: https://www.dmytrohuz.com/p/building-own-mac-part-3-reinventing

So for our purposes, the important idea is simple:

encryption hides the message

MAC protects the message from silent modification

That is the missing half we need.


Adding HMAC to the channel

Let’s start by upgrading the record format from Part 1.

In Part 1, our protected payload was basically:

nonce || ciphertext
Enter fullscreen mode Exit fullscreen mode

Now we will add a MAC tag:

nonce || ciphertext || tag
Enter fullscreen mode Exit fullscreen mode

And the sender will compute the HMAC over:

nonce || ciphertext
Enter fullscreen mode Exit fullscreen mode

So the full logic becomes:

Sender

  1. encrypt plaintext with AES-CTR
  2. compute HMAC over nonce || ciphertext
  3. send nonce || ciphertext || tag

Receiver

  1. read nonce || ciphertext || tag
  2. recompute HMAC over nonce || ciphertext
  3. compare tags
  4. only if they match, decrypt the ciphertext
  5. otherwise reject the message

There is one more small improvement I want to make here.

Instead of using one key for everything, we will already separate them:

  • one key for encryption
  • one key for HMAC

This is still a toy setup, but it is better design than reusing the same bytes for every cryptographic job.

HMAC helpers

import os
import hmac
import hashlib

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

# ---------------------------------------------------------------------------
# Keys — hardcoded for educational purposes.
# In a real protocol, these would be derived from a key exchange (e.g.,
# Diffie-Hellman), not embedded in source code.
# ---------------------------------------------------------------------------

# 32-byte (256-bit) key for AES-256-CTR encryption.
ENC_KEY = b"0123456789ABCDEF0123456789ABCDEF"

# 32-byte key for HMAC-SHA256.  Separate from the encryption key.
MAC_KEY = b"HMAC_KEY_FOR_PART2_DEMO_1234567"

# HMAC-SHA256 produces a 32-byte (256-bit) tag.
TAG_LEN = 32

# AES-CTR nonce is 16 bytes (128 bits).
NONCE_LEN = 16

def encrypt_then_mac(plaintext: bytes) -> bytes:
    """Encrypt a plaintext and append an HMAC tag.

    Returns: nonce (16 B) || ciphertext (N B) || tag (32 B)
    """

    # Step 1: Generate a fresh random nonce for AES-CTR.
    # A new nonce MUST be used for every record — reusing a nonce with
    # the same key completely breaks CTR-mode security.
    nonce = os.urandom(NONCE_LEN)

    # Step 2: Encrypt the plaintext with AES-256-CTR.
    cipher = Cipher(algorithms.AES(ENC_KEY), modes.CTR(nonce))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(plaintext) + encryptor.finalize()

    # Step 3: Compute HMAC-SHA256 over (nonce || ciphertext).
    # New in Part 2: we authenticate the encrypted record before sending it.
    # The HMAC input includes the nonce so an attacker cannot swap nonces
    # between records without detection.
    mac_input = nonce + ciphertext
    tag = hmac.new(MAC_KEY, mac_input, hashlib.sha256).digest()

    print(f"  [crypto_hmac] encrypt_then_mac:")
    print(f"    nonce    = {nonce.hex()[:32]}...")
    print(f"    ct_len   = {len(ciphertext)} bytes")
    print(f"    tag      = {tag.hex()[:32]}...")

    # Step 4: Assemble the wire format.
    return nonce + ciphertext + tag

def verify_then_decrypt(payload: bytes) -> bytes:
    """Verify the HMAC tag, then decrypt if valid.

    Expects: nonce (16 B) || ciphertext (N B) || tag (32 B)
    Raises ValueError if the tag does not match.
    """

    # Step 1: Parse the record into its components.
    # The tag is always the last 32 bytes.  The nonce is the first 16.
    # Everything in between is ciphertext.
    if len(payload) < NONCE_LEN + TAG_LEN:
        raise ValueError("Record too short to contain nonce + tag")

    nonce = payload[:NONCE_LEN]
    ciphertext = payload[NONCE_LEN:-TAG_LEN]
    received_tag = payload[-TAG_LEN:]

    # Step 2: Recompute the HMAC over (nonce || ciphertext).
    mac_input = nonce + ciphertext
    expected_tag = hmac.new(MAC_KEY, mac_input, hashlib.sha256).digest()

    # Step 3: Compare tags using constant-time comparison.
    # hmac.compare_digest() prevents timing side-channel attacks.
    # A naive `==` comparison can leak information about which byte
    # position differs first, allowing an attacker to forge a valid
    # tag byte by byte.
    if not hmac.compare_digest(received_tag, expected_tag):
        print("  [crypto_hmac] *** MAC VERIFICATION FAILED — record rejected ***")
        raise ValueError("HMAC verification failed — record has been tampered with")

    print("  [crypto_hmac] MAC verification: OK")

    # Step 4: Decrypt only after verification succeeds.
    # This is the key benefit of encrypt-then-MAC: we never process
    # unauthenticated ciphertext.
    cipher = Cipher(algorithms.AES(ENC_KEY), modes.CTR(nonce))
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()

    return plaintext

Enter fullscreen mode Exit fullscreen mode

This is the first big improvement over Part 1.

The important change is not just that we added a tag.

It is that the receiver no longer blindly trusts ciphertext and only then discovers what it means. Now the receiver first checks whether the record is authentic and unchanged.

That is a very different protocol posture.


Updating the client and server

Now let’s plug this into the channel.

HMAC-based client

import socket

from framing import send_record, recv_record
from crypto_hmac import encrypt_then_mac, verify_then_decrypt

HOST = "127.0.0.1"
PORT = 9001

# A toy HTTP-like request — same spirit as Part 1.
request = (
    "GET /transfer?to=bob&amount=100 HTTP/1.1\r\nHost: localhost\r\n\r\n"
).encode("utf-8")

print("=" * 60)
print("Part 2 — HMAC Client (encrypt-then-MAC)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    client.connect((HOST, PORT))
    print(f"Connected to {HOST}:{PORT}")

    # ----- SEND REQUEST -----
    print("\n--- Sending request ---")
    protected = encrypt_then_mac(request)
    send_record(client, protected)
    print(f"  Record sent ({len(protected)} bytes on wire)")

    # ----- RECEIVE RESPONSE -----
    print("\n--- Receiving response ---")
    raw_response = recv_record(client)
    response = verify_then_decrypt(raw_response)
    print(f"\n  Decrypted response:\n  {response.decode('utf-8')}")

print("\nDone.")
Enter fullscreen mode Exit fullscreen mode

HMAC-based server

import socket

from framing import send_record, recv_record
from crypto_hmac import encrypt_then_mac, verify_then_decrypt

HOST = "127.0.0.1"
PORT = 9001

print("=" * 60)
print("Part 2 — HMAC Server (encrypt-then-MAC)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    # SO_REUSEADDR lets us restart the server immediately without waiting
    # for the OS to release the port from TIME_WAIT state.
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(1)
    print(f"Listening on {HOST}:{PORT}")

    conn, addr = server.accept()
    with conn:
        print(f"Connected by {addr}")

        # ----- RECEIVE REQUEST -----
        print("\n--- Receiving request ---")
        raw_request = recv_record(conn)

        try:
            request = verify_then_decrypt(raw_request)
        except ValueError as e:
            # New in Part 2: if the MAC fails, we reject the record loudly.
            # In Part 1 we had no way to detect tampering at all.
            print(f"\n  *** REJECTED: {e} ***")
            print("  Connection closed — refusing to process tampered data.")
        else:
            print(f"\n  Decrypted request:\n  {request.decode('utf-8')}")

            # ----- SEND RESPONSE -----
            print("--- Sending response ---")
            response = (
                "HTTP/1.1 200 OK\r\n"
                "Content-Type: text/plain\r\n"
                "Content-Length: 13\r\n\r\n"
                "hello, client"
            ).encode("utf-8")

            protected = encrypt_then_mac(response)
            send_record(conn, protected)
            print(f"  Record sent ({len(protected)} bytes on wire)")

print("\nDone.")
Enter fullscreen mode Exit fullscreen mode

The shape of the channel is still familiar.

That matters.

We did not replace the whole design.

We strengthened one missing property.

That is how protocol evolution should feel.


Let’s check it on the wire.

We try to start the server and client, which we just created.

Here is our client request’s data.

-- Sending request ---
[crypto_hmac] encrypt_then_mac:
nonce = 387f0065f8915133473597d8cef15f34...
ct_len = 61 bytes
tag = b7f0db369e5d2a221a18f2d167b5b3a8...
Record sent (109 bytes on wire)
Enter fullscreen mode Exit fullscreen mode

wireshark


Detecting tampering

Now let’s revisit the failure from Part 1.

Previously, if someone modified the ciphertext, the receiver would still decrypt it and accept modified plaintext.

Now that should no longer work.

Here is a tiny tampering demo:

# tampering_demo_hmac.py
from crypto_hmac import encrypt_then_mac, verify_then_decrypt

original = b"amount=100"
protected = encrypt_then_mac(original)

tampered = bytearray(protected)
tampered[20] ^= 0x08  # flip one bit somewhere in the encrypted body

try:
    result = verify_then_decrypt(bytes(tampered))
    print("Unexpected success:", result)
except ValueError as e:
    print("Tampering detected:", e)
Enter fullscreen mode Exit fullscreen mode

Now the result should be rejection, not silent acceptance.

That is exactly what we wanted.

This is the moment where our channel stops being merely “encrypted” and starts being “protected.”

Because now the receiver does not just recover bytes. It verifies them first.

That is a serious step.


Why we also need a sequence number

At this point, we fixed the big flaw from Part 1: silent tampering.

But the record layer is still naive.

Why?

Because even with a valid HMAC, the receiver still has no sense of record position or freshness.

Imagine an attacker records one valid protected message and sends it again later.

The HMAC is still valid.

The ciphertext is still valid.

And unless the receiver keeps some state, it may accept the same record again.

That means integrity alone is not the whole story.

We also need some sense of:

  • order
  • position
  • repetition
  • replay

This is where sequence numbers come in.

A sequence number is just a counter that increases with every record:

  • first record = 0
  • next = 1
  • next = 2
  • and so on

We then include that sequence number in the authenticated data, so the receiver does not just verify “these bytes were protected,” but also “these bytes belong in this position in the stream.”

That makes the record layer much less naive.

It still does not solve every replay problem in every possible system. But for our toy protocol, it is a very good next step.


Updating the record format

Now our record becomes:

seq || nonce || ciphertext || tag
Enter fullscreen mode Exit fullscreen mode

And our HMAC input becomes:

seq || nonce || ciphertext
Enter fullscreen mode Exit fullscreen mode

So the sender and receiver now both need a little bit of state:

  • the sender tracks the next sequence number to send
  • the receiver tracks the next sequence number it expects

This is one of those moments where secure transport starts looking more like a real protocol and less like “some crypto around a socket.”

Sequence-aware HMAC helper

# crypto_hmac_seq.py
import os
import hmac
import hashlib
import struct

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

# ---------------------------------------------------------------------------
# Keys — same as crypto_hmac.py, hardcoded for education.
# ---------------------------------------------------------------------------
ENC_KEY = b"0123456789ABCDEF0123456789ABCDEF"
MAC_KEY = b"HMAC_KEY_FOR_PART2_DEMO_1234567"

TAG_LEN = 32  # HMAC-SHA256 output: 32 bytes (256 bits)
NONCE_LEN = 16  # AES-CTR nonce: 16 bytes (128 bits)
SEQ_LEN = 8  # Sequence number: 8 bytes (64-bit unsigned integer)

def protect_record(seq: int, plaintext: bytes) -> bytes:
    """Encrypt a plaintext record and attach a sequence-aware HMAC tag.

    Args:
        seq:       The current send-side sequence number (0, 1, 2, …).
        plaintext: The message to protect.

    Returns:
        seq (8 B) || nonce (16 B) || ciphertext (N B) || tag (32 B)
    """

    # Pack the sequence number as an 8-byte big-endian unsigned integer.
    # "!Q" = network byte order, unsigned 64-bit.
    seq_bytes = struct.pack("!Q", seq)

    # Generate a fresh AES-CTR nonce.
    nonce = os.urandom(NONCE_LEN)

    # Encrypt the plaintext.
    cipher = Cipher(algorithms.AES(ENC_KEY), modes.CTR(nonce))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(plaintext) + encryptor.finalize()

    # Compute HMAC over (seq || nonce || ciphertext).
    # The sequence number is included in the MAC input so the integrity
    # check also covers record order/position in the stream.
    mac_input = seq_bytes + nonce + ciphertext
    tag = hmac.new(MAC_KEY, mac_input, hashlib.sha256).digest()

    return seq_bytes + nonce + ciphertext + tag

def verify_and_unprotect(expected_seq: int, payload: bytes) -> bytes:
    """Verify the HMAC and sequence number, then decrypt.

    Args:
        expected_seq: The sequence number the receiver expects next.
        payload:      The raw bytes received: seq || nonce || ct || tag.

    Returns:
        The decrypted plaintext.

    Raises:
        ValueError if the MAC is invalid or the sequence number is wrong.
    """

    min_len = SEQ_LEN + NONCE_LEN + TAG_LEN
    if len(payload) < min_len:
        raise ValueError("Record too short")

    # Step 1: Parse the record.
    seq_bytes = payload[:SEQ_LEN]
    nonce = payload[SEQ_LEN : SEQ_LEN + NONCE_LEN]
    ciphertext = payload[SEQ_LEN + NONCE_LEN : -TAG_LEN]
    received_tag = payload[-TAG_LEN:]

    # Step 2: Recompute HMAC over (seq || nonce || ciphertext).
    mac_input = seq_bytes + nonce + ciphertext
    expected_tag = hmac.new(MAC_KEY, mac_input, hashlib.sha256).digest()

    # Step 3: Constant-time tag comparison.
    if not hmac.compare_digest(received_tag, expected_tag):
        print("  [crypto_hmac_seq] *** MAC VERIFICATION FAILED ***")
        raise ValueError("HMAC verification failed — record tampered or replayed")

    print("  [crypto_hmac_seq] MAC verification: OK")

    # Step 4: Check the sequence number matches what we expect.
    # Even though the MAC already covers the sequence number (so an
    # attacker cannot change it without invalidating the MAC), we still
    # explicitly verify that it matches our counter.  This catches
    # replayed or reordered records that carry a valid MAC but belong
    # to a different position in the stream.
    (received_seq,) = struct.unpack("!Q", seq_bytes)
    if received_seq != expected_seq:
        print(
            f"  [crypto_hmac_seq] *** SEQUENCE MISMATCH: "
            f"got {received_seq}, expected {expected_seq} ***"
        )
        raise ValueError(
            f"Sequence number mismatch: got {received_seq}, expected {expected_seq}"
        )

    print(
        f"  [crypto_hmac_seq] Sequence number: {received_seq} (expected {expected_seq}) — OK"
    )

    # Step 5: Decrypt.
    cipher = Cipher(algorithms.AES(ENC_KEY), modes.CTR(nonce))
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()

    return plaintext

Enter fullscreen mode Exit fullscreen mode

And now the channel has a bit more memory.

Not just “is this record authentic?”

But also “is this the record I expected next?”

That is a real protocol improvement.


Updating the client and server

Now let’s plug this into the channel.

Sequence-aware HMAC-based client

# client_v2_hmac_seq.py
import socket

from framing import send_record, recv_record
from crypto_hmac_seq import protect_record, verify_and_unprotect

HOST = "127.0.0.1"
PORT = 9003

# Sequence counters — sender and receiver each maintain their own.
# The sender increments after each record sent.
# The receiver expects consecutive values starting from 0.
send_seq = 0
recv_seq = 0

# A toy HTTP-like request — same spirit as Part 1.
request = (
    "GET /transfer?to=bob&amount=100 HTTP/1.1\r\nHost: localhost\r\n\r\n"
).encode("utf-8")

print("=" * 60)
print("Part 2 — HMAC + Sequence Numbers Client (Stage 2)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    client.connect((HOST, PORT))
    print(f"Connected to {HOST}:{PORT}")

    # ----- SEND REQUEST -----
    print(f"\n--- Sending request (send_seq={send_seq}) ---")
    protected = protect_record(send_seq, request)
    send_record(client, protected)
    send_seq += 1
    print(f"  Record sent ({len(protected)} bytes on wire)")

    # ----- RECEIVE RESPONSE -----
    print(f"\n--- Receiving response (expecting recv_seq={recv_seq}) ---")
    raw_response = recv_record(client)

    try:
        response = verify_and_unprotect(recv_seq, raw_response)
        recv_seq += 1
        print(f"\n  Decrypted response:\n  {response.decode('utf-8')}")
    except ValueError as e:
        print(f"\n  *** REJECTED: {e} ***")

print("\nDone.")

Enter fullscreen mode Exit fullscreen mode

Sequence-aware HMAC-based server

# server_v2_hmac_seq.py
import socket

from framing import send_record, recv_record
from crypto_hmac_seq import protect_record, verify_and_unprotect

HOST = "127.0.0.1"
PORT = 9003

# Sequence counters.
# The server's recv_seq tracks the client's send_seq, and vice versa.
send_seq = 0
recv_seq = 0

print("=" * 60)
print("Part 2 — HMAC + Sequence Numbers Server (Stage 2)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(1)
    print(f"Listening on {HOST}:{PORT}")

    conn, addr = server.accept()
    with conn:
        print(f"Connected by {addr}")

        # ----- RECEIVE REQUEST -----
        print(f"\n--- Receiving request (expecting recv_seq={recv_seq}) ---")
        raw_request = recv_record(conn)

        try:
            request = verify_and_unprotect(recv_seq, raw_request)
            recv_seq += 1
        except ValueError as e:
            # Rejection: either the MAC is invalid, the sequence number
            # is wrong, or the data was tampered with / replayed.
            print(f"\n  *** REJECTED: {e} ***")
            print("  Connection closed — refusing to process invalid data.")
        else:
            print(f"\n  Decrypted request:\n  {request.decode('utf-8')}")

            # ----- SEND RESPONSE -----
            print(f"--- Sending response (send_seq={send_seq}) ---")
            response = (
                "HTTP/1.1 200 OK\r\n"
                "Content-Type: text/plain\r\n"
                "Content-Length: 13\r\n\r\n"
                "hello, client"
            ).encode("utf-8")

            protected = protect_record(send_seq, response)
            send_record(conn, protected)
            send_seq += 1
            print(f"  Record sent ({len(protected)} bytes on wire)")

print("\nDone.")

Enter fullscreen mode Exit fullscreen mode

Why real-world systems usually do not stop here

At this point, we have something much stronger than Part 1.

We have:

  • encryption
  • integrity protection
  • message authentication
  • sequence-aware records

That is already a meaningful protocol.

But if you look at how real systems are usually built, they do not normally stop at manually composing:

  • AES-CTR
  • HMAC-SHA256
  • explicit sequence-aware record protection

Why?

Because modern systems usually prefer a single primitive that gives confidentiality and integrity together.

That is where AEAD comes in.

We separated these properties on purpose because it makes the protocol easier to understand.

But the real world usually packages them together.


A very short note on AEAD

AEAD stands for Authenticated Encryption with Associated Data.

That sounds heavier than it really is.

The practical idea is simple:

An AEAD construction gives us:

  • encryption
  • integrity/authentication of the encrypted message
  • and the ability to authenticate extra metadata that should not be encrypted

Common examples are:

  • AES-GCM
  • ChaCha20-Poly1305

This is much closer to how modern secure protocols protect records.

It is also why I wanted to include AEAD in this part. If we stopped only at “encrypt + HMAC,” we would understand the missing property better, but we would still be one step away from how modern systems actually package it.

So now we take that final step.


Moving our channel to AEAD

For the AEAD version, I will use AES-GCM.

The high-level idea is:

  • the plaintext gets encrypted
  • integrity/authentication is built in
  • and we can include extra metadata as associated data

In our case, the sequence number is a good example of associated data.

That means:

  • it does not need to be encrypted
  • but it should still be authenticated

AEAD-based helper

# crypto_aead.py
import os
import struct

from cryptography.hazmat.primitives.ciphers.aead import AESGCM

# ---------------------------------------------------------------------------
# Key — a single 256-bit key for AES-GCM.
# With AEAD, we do NOT need separate encryption and MAC keys — the
# algorithm handles both internally.
# ---------------------------------------------------------------------------
AEAD_KEY = b"AEAD_KEY_PART2_DEMO_FOR_AES_GCM!"  # 32 bytes → AES-256-GCM

# AES-GCM nonce length: 12 bytes is the recommended (and most efficient) size.
NONCE_LEN = 12

# Sequence number: 8 bytes (64-bit unsigned integer), same as Stage 2.
SEQ_LEN = 8

def protect_record_aead(seq: int, plaintext: bytes) -> bytes:
    """Seal a plaintext record with AES-GCM.

    Args:
        seq:       The current send-side sequence number.
        plaintext: The message to protect.

    Returns:
        seq (8 B) || nonce (12 B) || ciphertext_and_tag (N+16 B)
    """

    # Pack the sequence number as associated data.
    # The sequence number is authenticated but sent in the clear — the
    # receiver needs it to know which counter value to expect.
    seq_bytes = struct.pack("!Q", seq)

    # Generate a random 12-byte nonce for AES-GCM.
    nonce = os.urandom(NONCE_LEN)

    # Create an AESGCM instance with our key.
    aesgcm = AESGCM(AEAD_KEY)

    # Encrypt and authenticate in one call.
    # AESGCM.encrypt(nonce, data, associated_data) returns
    # ciphertext || 16-byte authentication tag as a single bytes object.
    # The associated_data (seq_bytes) is authenticated but NOT encrypted.
    ciphertext_and_tag = aesgcm.encrypt(nonce, plaintext, seq_bytes)

    print(f"  [crypto_aead] protect_record_aead:")
    print(f"    seq      = {seq}")
    print(f"    nonce    = {nonce.hex()}")
    print(
        f"    sealed   = {len(ciphertext_and_tag)} bytes "
        f"(plaintext {len(plaintext)} + tag 16)"
    )

    return seq_bytes + nonce + ciphertext_and_tag

def unprotect_record_aead(expected_seq: int, payload: bytes) -> bytes:
    """Verify and decrypt an AES-GCM sealed record.

    Args:
        expected_seq: The sequence number the receiver expects next.
        payload:      seq (8 B) || nonce (12 B) || ciphertext_and_tag.

    Returns:
        The decrypted plaintext.

    Raises:
        ValueError if the sequence number is wrong.
        cryptography.exceptions.InvalidTag if decryption/auth fails.
    """

    min_len = SEQ_LEN + NONCE_LEN + 16  # at least seq + nonce + tag
    if len(payload) < min_len:
        raise ValueError("Record too short")

    # Step 1: Parse the record.
    seq_bytes = payload[:SEQ_LEN]
    nonce = payload[SEQ_LEN : SEQ_LEN + NONCE_LEN]
    ciphertext_and_tag = payload[SEQ_LEN + NONCE_LEN :]

    # Step 2: Check the sequence number.
    (received_seq,) = struct.unpack("!Q", seq_bytes)
    if received_seq != expected_seq:
        print(
            f"  [crypto_aead] *** SEQUENCE MISMATCH: "
            f"got {received_seq}, expected {expected_seq} ***"
        )
        raise ValueError(
            f"Sequence number mismatch: got {received_seq}, expected {expected_seq}"
        )

    print(
        f"  [crypto_aead] Sequence number: {received_seq} "
        f"(expected {expected_seq}) — OK"
    )

    # Step 3: Decrypt and verify in one call.
    # AESGCM.decrypt(nonce, data, associated_data) verifies the auth tag
    # and decrypts.  If anything was tampered with — the ciphertext, the
    # tag, or the associated data — it raises InvalidTag.
    aesgcm = AESGCM(AEAD_KEY)
    plaintext = aesgcm.decrypt(nonce, ciphertext_and_tag, seq_bytes)

    print(f"  [crypto_aead] AEAD decryption: OK ({len(plaintext)} bytes)")

    return plaintext

Enter fullscreen mode Exit fullscreen mode

This code is noticeably simpler.

That is one of the big practical advantages of AEAD.

Instead of manually:

  • encrypting
  • computing HMAC
  • verifying HMAC
  • then decrypting

we use one primitive that already combines confidentiality and integrity.

And the sequence number fits naturally as associated data.

AEAD-based client

# client_v2_aead.py
import socket

from framing import send_record, recv_record
from crypto_aead import protect_record_aead, unprotect_record_aead

HOST = "127.0.0.1"
PORT = 9002

# Sequence counters — sender and receiver each maintain their own.
# The sender increments after each record sent.
# The receiver expects consecutive values starting from 0.
send_seq = 0
recv_seq = 0

# A toy HTTP-like request.
request = (
    "GET /transfer?to=bob&amount=100 HTTP/1.1\r\nHost: localhost\r\n\r\n"
).encode("utf-8")

print("=" * 60)
print("Part 2 — AEAD Client (AES-GCM)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
    client.connect((HOST, PORT))
    print(f"Connected to {HOST}:{PORT}")

    # ----- SEND REQUEST -----
    print(f"\n--- Sending request (send_seq={send_seq}) ---")
    protected = protect_record_aead(send_seq, request)
    send_record(client, protected)
    send_seq += 1
    print(f"  Record sent ({len(protected)} bytes on wire)")

    # ----- RECEIVE RESPONSE -----
    print(f"\n--- Receiving response (expecting recv_seq={recv_seq}) ---")
    raw_response = recv_record(client)

    try:
        response = unprotect_record_aead(recv_seq, raw_response)
        recv_seq += 1
        print(f"\n  Decrypted response:\n  {response.decode('utf-8')}")
    except Exception as e:
        print(f"\n  *** REJECTED: {e} ***")

print("\nDone.")

Enter fullscreen mode Exit fullscreen mode

AEAD-based server

# server_v2_aead.py
import socket

from framing import send_record, recv_record
from crypto_aead import protect_record_aead, unprotect_record_aead

HOST = "127.0.0.1"
PORT = 9002

# Sequence counters.
# The server's recv_seq tracks the client's send_seq, and vice versa.
send_seq = 0
recv_seq = 0

print("=" * 60)
print("Part 2 — AEAD Server (AES-GCM)")
print("=" * 60)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(1)
    print(f"Listening on {HOST}:{PORT}")

    conn, addr = server.accept()
    with conn:
        print(f"Connected by {addr}")

        # ----- RECEIVE REQUEST -----
        print(f"\n--- Receiving request (expecting recv_seq={recv_seq}) ---")
        raw_request = recv_record(conn)

        try:
            request = unprotect_record_aead(recv_seq, raw_request)
            recv_seq += 1
        except Exception as e:
            # AEAD rejection: either the auth tag is invalid, the sequence
            # number is wrong, or the data was tampered with.
            print(f"\n  *** REJECTED: {e} ***")
            print("  Connection closed — refusing to process invalid data.")
        else:
            print(f"\n  Decrypted request:\n  {request.decode('utf-8')}")

            # ----- SEND RESPONSE -----
            print(f"--- Sending response (send_seq={send_seq}) ---")
            response = (
                "HTTP/1.1 200 OK\r\n"
                "Content-Type: text/plain\r\n"
                "Content-Length: 13\r\n\r\n"
                "hello, client"
            ).encode("utf-8")

            protected = protect_record_aead(send_seq, response)
            send_record(conn, protected)
            send_seq += 1
            print(f"  Record sent ({len(protected)} bytes on wire)")

print("\nDone.")

Enter fullscreen mode Exit fullscreen mode

This version is already much closer to how modern secure transport actually protects records.

Not identical to TLS, of course. But structurally much closer.


What we gained

At this point, our channel is much stronger than the one from Part 1.

We now have:

  • confidentiality
  • integrity protection
  • authenticated records
  • sequence-aware message handling
  • a much more realistic record protection design through AEAD

That is a big improvement.

The receiver is no longer just decrypting whatever arrives and trusting the result. Now the receiver can reject modified or structurally unexpected records.

That is a real protocol boundary.


What is still broken

And yet, even now, we are still very far from real TLS.

Because the biggest assumption in our design is still untouched:

both sides already share the necessary secret keys

That means we still do not know how to solve the next real problem:

  • how do two strangers establish fresh secrets?
  • how does the client know it is talking to the right server?
  • how do we scale beyond hardcoded shared secrets?
  • how do we build trust instead of assuming it?

We improved record protection a lot.

But we still do not have a real way to establish trust.

That is the next wall.


Summary

In this part, we took the encrypted but still incomplete channel from Part 1 and made it much more serious.

First, we added HMAC, which gave the receiver a way to detect tampering.

Then, we added a sequence number, which made the record layer less naive and bound records to their place in the stream.

Finally, we moved to AEAD, because in real-world systems confidentiality and integrity are usually protected together, not assembled manually from separate pieces.

So this article had two goals:

  • understand the missing property explicitly
  • then move toward the real-world shape of the solution

That is why we did not stop at HMAC.

But even after all of this, we still depend on one assumption that makes the whole thing unrealistic:

we are still starting with pre-shared secret keys.

And that is exactly what the next part will attack.


Next part — getting rid of the pre-shared key

So we stop here.

We now have a much better record layer than in Part 1. But we are still relying on a hardcoded shared secret, and that is not how real secure communication between strangers on the internet works.

In the next part, we will stop assuming both sides already share a secret.

We will start building a real handshake and establish fresh session keys instead.

That still will not give us full TLS.

But it will take us much closer to the real shape of the protocol.


Final code

I’ll put the full code for this part on GitHub here:

[GitHub link to final code]

Top comments (0)